From 2b3238786472bda99bdf23cb5fa333d638af9adc Mon Sep 17 00:00:00 2001 From: l3ocho Date: Sun, 8 Feb 2026 15:41:46 -0500 Subject: [PATCH] feat: scaffold gitea-mcp package with module rename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Copied source from leo-claude-mktplace/mcp-servers/gitea/ v1.0.0 - Renamed module: mcp_server → gitea_mcp (all imports updated) - Created pyproject.toml for standalone package identity - Preserved all existing tools and test suite - MCP SDK imports unchanged Co-Authored-By: Claude Opus 4.6 --- .gitignore | 12 + gitea_mcp/__init__.py | 23 + gitea_mcp/config.py | 227 ++++++ gitea_mcp/gitea_client.py | 849 +++++++++++++++++++++++ gitea_mcp/server.py | 93 +++ gitea_mcp/tool_registry.py | 1098 ++++++++++++++++++++++++++++++ gitea_mcp/tools/__init__.py | 11 + gitea_mcp/tools/dependencies.py | 216 ++++++ gitea_mcp/tools/issues.py | 287 ++++++++ gitea_mcp/tools/labels.py | 377 ++++++++++ gitea_mcp/tools/milestones.py | 145 ++++ gitea_mcp/tools/pull_requests.py | 335 +++++++++ gitea_mcp/tools/wiki.py | 187 +++++ pyproject.toml | 40 ++ requirements.txt | 4 + tests/__init__.py | 0 tests/test_config.py | 259 +++++++ tests/test_gitea_client.py | 270 ++++++++ tests/test_issues.py | 163 +++++ tests/test_labels.py | 478 +++++++++++++ 20 files changed, 5074 insertions(+) create mode 100644 .gitignore create mode 100644 gitea_mcp/__init__.py create mode 100644 gitea_mcp/config.py create mode 100644 gitea_mcp/gitea_client.py create mode 100644 gitea_mcp/server.py create mode 100644 gitea_mcp/tool_registry.py create mode 100644 gitea_mcp/tools/__init__.py create mode 100644 gitea_mcp/tools/dependencies.py create mode 100644 gitea_mcp/tools/issues.py create mode 100644 gitea_mcp/tools/labels.py create mode 100644 gitea_mcp/tools/milestones.py create mode 100644 gitea_mcp/tools/pull_requests.py create mode 100644 gitea_mcp/tools/wiki.py create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/test_config.py create mode 100644 tests/test_gitea_client.py create mode 100644 tests/test_issues.py create mode 100644 tests/test_labels.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6fab5a7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +dist/ +build/ +.venv/ +.env +*.egg +.pytest_cache/ +.coverage +htmlcov/ diff --git a/gitea_mcp/__init__.py b/gitea_mcp/__init__.py new file mode 100644 index 0000000..87bfcfa --- /dev/null +++ b/gitea_mcp/__init__.py @@ -0,0 +1,23 @@ +""" +gitea-mcp: MCP server for Gitea integration. + +Provides 39 MCP tools for issue management, PRs, wikis, milestones, +dependencies, labels, and merge operations via Gitea API. + +Public API for external consumers: + from gitea_mcp import get_tool_definitions, create_tool_dispatcher, GiteaClient, GiteaConfig +""" + +__version__ = "1.0.0" + +from .tool_registry import get_tool_definitions, create_tool_dispatcher +from .gitea_client import GiteaClient +from .config import GiteaConfig + +__all__ = [ + "__version__", + "get_tool_definitions", + "create_tool_dispatcher", + "GiteaClient", + "GiteaConfig", +] diff --git a/gitea_mcp/config.py b/gitea_mcp/config.py new file mode 100644 index 0000000..a857a0a --- /dev/null +++ b/gitea_mcp/config.py @@ -0,0 +1,227 @@ +""" +Configuration loader for Gitea MCP Server. + +Implements hybrid configuration system: +- System-level: ~/.config/claude/gitea.env (credentials) +- Project-level: .env (repository specification) +- Auto-detection: Falls back to git remote URL parsing +""" +from pathlib import Path +from dotenv import load_dotenv +import os +import re +import subprocess +import logging +from typing import Dict, Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class GiteaConfig: + """Hybrid configuration loader with mode detection""" + + def __init__(self): + self.api_url: Optional[str] = None + self.api_token: Optional[str] = None + self.repo: Optional[str] = None + self.mode: str = 'project' + + def load(self) -> Dict[str, Optional[str]]: + """ + Load configuration from system and project levels. + Project-level configuration overrides system-level. + + Returns: + Dict containing api_url, api_token, repo, mode + + Raises: + FileNotFoundError: If system config is missing + ValueError: If required configuration is missing + """ + # Load system config + system_config = Path.home() / '.config' / 'claude' / 'gitea.env' + if system_config.exists(): + load_dotenv(system_config) + logger.info(f"Loaded system configuration from {system_config}") + else: + raise FileNotFoundError( + f"System config not found: {system_config}\n" + "Create it with: mkdir -p ~/.config/claude && " + "cat > ~/.config/claude/gitea.env" + ) + + # Find project directory (MCP server cwd is plugin dir, not project dir) + project_dir = self._find_project_directory() + + # Load project config (overrides system) + if project_dir: + project_config = project_dir / '.env' + if project_config.exists(): + load_dotenv(project_config, override=True) + logger.info(f"Loaded project configuration from {project_config}") + + # Extract values + self.api_url = os.getenv('GITEA_API_URL') + self.api_token = os.getenv('GITEA_API_TOKEN') + self.repo = os.getenv('GITEA_REPO') # Optional, must be owner/repo format + + # Auto-detect repo from git remote if not specified + if not self.repo and project_dir: + self.repo = self._detect_repo_from_git(project_dir) + if self.repo: + logger.info(f"Auto-detected repository from git remote: {self.repo}") + + # Detect mode + if self.repo: + self.mode = 'project' + logger.info(f"Running in project mode: {self.repo}") + else: + self.mode = 'company' + logger.info("Running in company-wide mode (PMO)") + + # Validate required variables + self._validate() + + return { + 'api_url': self.api_url, + 'api_token': self.api_token, + 'repo': self.repo, + 'mode': self.mode + } + + def _validate(self) -> None: + """ + Validate that required configuration is present. + + Raises: + ValueError: If required configuration is missing + """ + required = { + 'GITEA_API_URL': self.api_url, + 'GITEA_API_TOKEN': self.api_token + } + + missing = [key for key, value in required.items() if not value] + + if missing: + raise ValueError( + f"Missing required configuration: {', '.join(missing)}\n" + "Check your ~/.config/claude/gitea.env file" + ) + + def _find_project_directory(self) -> Optional[Path]: + """ + Find the user's project directory. + + The MCP server runs with cwd set to the plugin directory, not the + user's project. We need to find the actual project directory using + various heuristics. + + Returns: + Path to project directory, or None if not found + """ + # Strategy 1: Check CLAUDE_PROJECT_DIR environment variable + project_dir = os.getenv('CLAUDE_PROJECT_DIR') + if project_dir: + path = Path(project_dir) + if path.exists(): + logger.info(f"Found project directory from CLAUDE_PROJECT_DIR: {path}") + return path + + # Strategy 2: Check PWD (original working directory before cwd override) + pwd = os.getenv('PWD') + if pwd: + path = Path(pwd) + # Verify it has .git or .env (indicates a project) + if path.exists() and ((path / '.git').exists() or (path / '.env').exists()): + logger.info(f"Found project directory from PWD: {path}") + return path + + # Strategy 3: Check current working directory + # This handles test scenarios and cases where cwd is actually the project + cwd = Path.cwd() + if (cwd / '.git').exists() or (cwd / '.env').exists(): + logger.info(f"Found project directory from cwd: {cwd}") + return cwd + + # Strategy 4: Check if GITEA_REPO is already set (user configured it) + # If so, we don't need to find the project directory for git detection + if os.getenv('GITEA_REPO'): + logger.debug("GITEA_REPO already set, skipping project directory detection") + return None + + logger.debug("Could not determine project directory") + return None + + def _detect_repo_from_git(self, project_dir: Optional[Path] = None) -> Optional[str]: + """ + Auto-detect repository from git remote origin URL. + + Args: + project_dir: Directory to run git command from (defaults to cwd) + + Supports URL formats: + - SSH: ssh://git@host:port/owner/repo.git + - SSH short: git@host:owner/repo.git + - HTTPS: https://host/owner/repo.git + - HTTP: http://host/owner/repo.git + + Returns: + Repository in 'owner/repo' format, or None if detection fails + """ + try: + result = subprocess.run( + ['git', 'remote', 'get-url', 'origin'], + capture_output=True, + text=True, + timeout=5, + cwd=str(project_dir) if project_dir else None + ) + if result.returncode != 0: + logger.debug("No git remote 'origin' found") + return None + + url = result.stdout.strip() + return self._parse_git_url(url) + + except subprocess.TimeoutExpired: + logger.warning("Git command timed out") + return None + except FileNotFoundError: + logger.debug("Git not available") + return None + except Exception as e: + logger.debug(f"Failed to detect repo from git: {e}") + return None + + def _parse_git_url(self, url: str) -> Optional[str]: + """ + Parse git URL to extract owner/repo. + + Args: + url: Git remote URL + + Returns: + Repository in 'owner/repo' format, or None if parsing fails + """ + # Remove .git suffix if present + url = re.sub(r'\.git$', '', url) + + # SSH format: ssh://git@host:port/owner/repo + ssh_match = re.match(r'ssh://[^/]+/(.+/.+)$', url) + if ssh_match: + return ssh_match.group(1) + + # SSH short format: git@host:owner/repo + ssh_short_match = re.match(r'git@[^:]+:(.+/.+)$', url) + if ssh_short_match: + return ssh_short_match.group(1) + + # HTTPS/HTTP format: https://host/owner/repo + http_match = re.match(r'https?://[^/]+/(.+/.+)$', url) + if http_match: + return http_match.group(1) + + logger.warning(f"Could not parse git URL: {url}") + return None diff --git a/gitea_mcp/gitea_client.py b/gitea_mcp/gitea_client.py new file mode 100644 index 0000000..e67ea7f --- /dev/null +++ b/gitea_mcp/gitea_client.py @@ -0,0 +1,849 @@ +""" +Gitea API client for interacting with Gitea API. + +Provides synchronous methods for: +- Issue CRUD operations +- Label management +- Repository operations +- PMO multi-repo aggregation +- Wiki operations (lessons learned) +- Milestone management +- Issue dependencies +""" +import requests +import logging +import re +from typing import List, Dict, Optional +from .config import GiteaConfig + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class GiteaClient: + """Client for interacting with Gitea API""" + + def __init__(self): + """Initialize Gitea client with configuration""" + config = GiteaConfig() + config_dict = config.load() + + self.base_url = config_dict['api_url'] + self.token = config_dict['api_token'] + self.repo = config_dict.get('repo') # Optional default repo in owner/repo format + self.mode = config_dict['mode'] + + self.session = requests.Session() + self.session.headers.update({ + 'Authorization': f'token {self.token}', + 'Content-Type': 'application/json' + }) + + logger.info(f"Gitea client initialized in {self.mode} mode") + + def _parse_repo(self, repo: Optional[str] = None) -> tuple: + """Parse owner/repo from input. Always requires 'owner/repo' format.""" + target = repo or self.repo + if not target or '/' not in target: + raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')") + parts = target.split('/', 1) + return parts[0], parts[1] + + def list_issues( + self, + state: str = 'open', + labels: Optional[List[str]] = None, + milestone: Optional[str] = None, + repo: Optional[str] = None + ) -> List[Dict]: + """ + List issues from Gitea repository. + + Args: + state: Issue state (open, closed, all) + labels: Filter by labels + milestone: Filter by milestone title (exact match) + repo: Repository in 'owner/repo' format + + Returns: + List of issue dictionaries + """ + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues" + params = {'state': state} + if labels: + params['labels'] = ','.join(labels) + if milestone: + params['milestones'] = milestone + logger.info(f"Listing issues from {owner}/{target_repo} with state={state}") + response = self.session.get(url, params=params) + response.raise_for_status() + return response.json() + + def get_issue( + self, + issue_number: int, + repo: Optional[str] = None + ) -> Dict: + """Get specific issue details.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}" + logger.info(f"Getting issue #{issue_number} from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def create_issue( + self, + title: str, + body: str, + labels: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> Dict: + """Create a new issue in Gitea.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues" + data = {'title': title, 'body': body} + if labels: + label_ids = self._resolve_label_ids(labels, owner, target_repo) + data['labels'] = label_ids + logger.info(f"Creating issue in {owner}/{target_repo}: {title}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def _resolve_label_ids(self, label_names: List[str], owner: str, repo: str) -> List[int]: + """Convert label names to label IDs.""" + full_repo = f"{owner}/{repo}" + + # Only fetch org labels if repo belongs to an organization + org_labels = [] + if self.is_org_repo(full_repo): + org_labels = self.get_org_labels(owner) + + repo_labels = self.get_labels(full_repo) + all_labels = org_labels + repo_labels + label_map = {label['name']: label['id'] for label in all_labels} + label_ids = [] + for name in label_names: + if name in label_map: + label_ids.append(label_map[name]) + else: + logger.warning(f"Label '{name}' not found, skipping") + return label_ids + + def update_issue( + self, + issue_number: int, + title: Optional[str] = None, + body: Optional[str] = None, + state: Optional[str] = None, + labels: Optional[List[str]] = None, + milestone: Optional[int] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Update existing issue. + + Args: + issue_number: Issue number to update + title: New title (optional) + body: New body (optional) + state: New state - 'open' or 'closed' (optional) + labels: New labels (optional) + milestone: Milestone ID to assign (optional) + repo: Repository in 'owner/repo' format + + Returns: + Updated issue dictionary + """ + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}" + data = {} + if title is not None: + data['title'] = title + if body is not None: + data['body'] = body + if state is not None: + data['state'] = state + if labels is not None: + data['labels'] = labels + if milestone is not None: + data['milestone'] = milestone + logger.info(f"Updating issue #{issue_number} in {owner}/{target_repo}") + response = self.session.patch(url, json=data) + response.raise_for_status() + return response.json() + + def add_comment( + self, + issue_number: int, + comment: str, + repo: Optional[str] = None + ) -> Dict: + """Add comment to issue. Repo must be 'owner/repo' format.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/comments" + data = {'body': comment} + logger.info(f"Adding comment to issue #{issue_number} in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def get_labels(self, repo: Optional[str] = None) -> List[Dict]: + """Get all labels from repository. Repo must be 'owner/repo' format.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/labels" + logger.info(f"Getting labels from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def get_org_labels(self, org: str) -> List[Dict]: + """Get organization-level labels. Org is the organization name.""" + url = f"{self.base_url}/orgs/{org}/labels" + logger.info(f"Getting organization labels for {org}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def list_repos(self, org: str) -> List[Dict]: + """List all repositories in organization. Org is the organization name.""" + url = f"{self.base_url}/orgs/{org}/repos" + logger.info(f"Listing all repositories for organization {org}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def aggregate_issues( + self, + org: str, + state: str = 'open', + labels: Optional[List[str]] = None + ) -> Dict[str, List[Dict]]: + """Fetch issues across all repositories in org.""" + repos = self.list_repos(org) + aggregated = {} + logger.info(f"Aggregating issues across {len(repos)} repositories") + for repo in repos: + repo_name = repo['name'] + try: + issues = self.list_issues( + state=state, + labels=labels, + repo=f"{org}/{repo_name}" + ) + if issues: + aggregated[repo_name] = issues + logger.info(f"Found {len(issues)} issues in {repo_name}") + except Exception as e: + logger.error(f"Error fetching issues from {repo_name}: {e}") + + return aggregated + + # ======================================== + # WIKI OPERATIONS (Lessons Learned) + # ======================================== + + def list_wiki_pages(self, repo: Optional[str] = None) -> List[Dict]: + """List all wiki pages in repository.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/pages" + logger.info(f"Listing wiki pages from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def get_wiki_page( + self, + page_name: str, + repo: Optional[str] = None + ) -> Dict: + """Get a specific wiki page by name.""" + from urllib.parse import quote + owner, target_repo = self._parse_repo(repo) + # URL-encode the page_name to handle special characters like ':' + encoded_page_name = quote(page_name, safe='') + url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}" + logger.info(f"Getting wiki page '{page_name}' from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def create_wiki_page( + self, + title: str, + content: str, + repo: Optional[str] = None + ) -> Dict: + """Create a new wiki page.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/new" + data = { + 'title': title, + 'content_base64': self._encode_base64(content) + } + logger.info(f"Creating wiki page '{title}' in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def update_wiki_page( + self, + page_name: str, + content: str, + repo: Optional[str] = None + ) -> Dict: + """Update an existing wiki page.""" + from urllib.parse import quote + owner, target_repo = self._parse_repo(repo) + # URL-encode the page_name to handle special characters like ':' + encoded_page_name = quote(page_name, safe='') + url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}" + data = { + 'title': page_name, # CRITICAL: include title to preserve page name + 'content_base64': self._encode_base64(content) + } + logger.info(f"Updating wiki page '{page_name}' in {owner}/{target_repo}") + response = self.session.patch(url, json=data) + response.raise_for_status() + return response.json() + + def delete_wiki_page( + self, + page_name: str, + repo: Optional[str] = None + ) -> bool: + """Delete a wiki page.""" + from urllib.parse import quote + owner, target_repo = self._parse_repo(repo) + # URL-encode the page_name to handle special characters like ':' + encoded_page_name = quote(page_name, safe='') + url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}" + logger.info(f"Deleting wiki page '{page_name}' from {owner}/{target_repo}") + response = self.session.delete(url) + response.raise_for_status() + return True + + def _encode_base64(self, content: str) -> str: + """Encode content to base64 for wiki API.""" + import base64 + return base64.b64encode(content.encode('utf-8')).decode('utf-8') + + def _decode_base64(self, content: str) -> str: + """Decode base64 content from wiki API.""" + import base64 + return base64.b64decode(content.encode('utf-8')).decode('utf-8') + + def search_wiki_pages( + self, + query: str, + repo: Optional[str] = None + ) -> List[Dict]: + """Search wiki pages by content (client-side filtering).""" + pages = self.list_wiki_pages(repo) + results = [] + query_lower = query.lower() + for page in pages: + if query_lower in page.get('title', '').lower(): + results.append(page) + return results + + def create_lesson( + self, + title: str, + content: str, + tags: List[str], + category: str = "sprints", + repo: Optional[str] = None + ) -> Dict: + """Create a lessons learned entry in the wiki.""" + # Sanitize title for wiki page name + page_name = f"lessons/{category}/{self._sanitize_page_name(title)}" + + # Add tags as metadata at the end of content + full_content = f"{content}\n\n---\n**Tags:** {', '.join(tags)}" + + return self.create_wiki_page(page_name, full_content, repo) + + def search_lessons( + self, + query: Optional[str] = None, + tags: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> List[Dict]: + """Search lessons learned by query and/or tags.""" + pages = self.list_wiki_pages(repo) + results = [] + + for page in pages: + title = page.get('title', '') + # Filter to only lessons (pages starting with lessons/) + if not title.startswith('lessons/'): + continue + + # If query provided, check if it matches title + if query: + if query.lower() not in title.lower(): + continue + + # Get full page content for tag matching if tags provided + if tags: + try: + full_page = self.get_wiki_page(title, repo) + content = self._decode_base64(full_page.get('content_base64', '')) + # Check if any tag is in the content + if not any(tag.lower() in content.lower() for tag in tags): + continue + except Exception: + continue + + results.append(page) + + return results + + def _sanitize_page_name(self, title: str) -> str: + """Convert title to valid wiki page name.""" + # Replace spaces with hyphens, remove special chars + name = re.sub(r'[^\w\s-]', '', title) + name = re.sub(r'[\s]+', '-', name) + return name.lower() + + # ======================================== + # MILESTONE OPERATIONS + # ======================================== + + def list_milestones( + self, + state: str = 'open', + repo: Optional[str] = None + ) -> List[Dict]: + """List all milestones in repository.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones" + params = {'state': state} + logger.info(f"Listing milestones from {owner}/{target_repo}") + response = self.session.get(url, params=params) + response.raise_for_status() + return response.json() + + def get_milestone( + self, + milestone_id: int, + repo: Optional[str] = None + ) -> Dict: + """Get a specific milestone by ID.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}" + logger.info(f"Getting milestone #{milestone_id} from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def create_milestone( + self, + title: str, + description: Optional[str] = None, + due_on: Optional[str] = None, + repo: Optional[str] = None + ) -> Dict: + """Create a new milestone.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones" + data = {'title': title} + if description: + data['description'] = description + if due_on: + data['due_on'] = due_on + logger.info(f"Creating milestone '{title}' in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def update_milestone( + self, + milestone_id: int, + title: Optional[str] = None, + description: Optional[str] = None, + state: Optional[str] = None, + due_on: Optional[str] = None, + repo: Optional[str] = None + ) -> Dict: + """Update an existing milestone.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}" + data = {} + if title is not None: + data['title'] = title + if description is not None: + data['description'] = description + if state is not None: + data['state'] = state + if due_on is not None: + data['due_on'] = due_on + logger.info(f"Updating milestone #{milestone_id} in {owner}/{target_repo}") + response = self.session.patch(url, json=data) + response.raise_for_status() + return response.json() + + def delete_milestone( + self, + milestone_id: int, + repo: Optional[str] = None + ) -> bool: + """Delete a milestone.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}" + logger.info(f"Deleting milestone #{milestone_id} from {owner}/{target_repo}") + response = self.session.delete(url) + response.raise_for_status() + return True + + # ======================================== + # ISSUE DEPENDENCY OPERATIONS + # ======================================== + + def list_issue_dependencies( + self, + issue_number: int, + repo: Optional[str] = None + ) -> List[Dict]: + """List all dependencies for an issue (issues that block this one).""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies" + logger.info(f"Listing dependencies for issue #{issue_number} in {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def create_issue_dependency( + self, + issue_number: int, + depends_on: int, + repo: Optional[str] = None + ) -> Dict: + """Create a dependency (issue_number depends on depends_on).""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies" + data = { + 'dependentIssue': { + 'owner': owner, + 'repo': target_repo, + 'index': depends_on + } + } + logger.info(f"Creating dependency: #{issue_number} depends on #{depends_on} in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def remove_issue_dependency( + self, + issue_number: int, + depends_on: int, + repo: Optional[str] = None + ) -> bool: + """Remove a dependency between issues.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies" + data = { + 'dependentIssue': { + 'owner': owner, + 'repo': target_repo, + 'index': depends_on + } + } + logger.info(f"Removing dependency: #{issue_number} no longer depends on #{depends_on}") + response = self.session.delete(url, json=data) + response.raise_for_status() + return True + + def list_issue_blocks( + self, + issue_number: int, + repo: Optional[str] = None + ) -> List[Dict]: + """List all issues that this issue blocks.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/blocks" + logger.info(f"Listing issues blocked by #{issue_number} in {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + # ======================================== + # REPOSITORY VALIDATION + # ======================================== + + def get_repo_info(self, repo: Optional[str] = None) -> Dict: + """Get repository information including owner type.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}" + logger.info(f"Getting repo info for {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def is_org_repo(self, repo: Optional[str] = None) -> bool: + """ + Check if repository belongs to an organization (not a user). + + Uses the /orgs/{owner} endpoint to reliably detect organizations, + as the owner.type field in repo info may be null in some Gitea versions. + """ + owner, _ = self._parse_repo(repo) + return self._is_organization(owner) + + def _is_organization(self, owner: str) -> bool: + """ + Check if an owner is an organization by querying the orgs endpoint. + + Args: + owner: The owner name to check + + Returns: + True if owner is an organization, False if user or unknown + """ + url = f"{self.base_url}/orgs/{owner}" + try: + response = self.session.get(url) + # 200 = organization exists, 404 = not an organization (user account) + return response.status_code == 200 + except Exception as e: + logger.warning(f"Failed to check if {owner} is organization: {e}") + return False + + def get_branch_protection( + self, + branch: str, + repo: Optional[str] = None + ) -> Optional[Dict]: + """Get branch protection rules for a branch.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/branch_protections/{branch}" + logger.info(f"Getting branch protection for {branch} in {owner}/{target_repo}") + try: + response = self.session.get(url) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + return None # No protection rules + raise + + def create_label( + self, + name: str, + color: str, + description: Optional[str] = None, + repo: Optional[str] = None + ) -> Dict: + """Create a new label in the repository.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/labels" + data = { + 'name': name, + 'color': color.lstrip('#') # Remove # if present + } + if description: + data['description'] = description + logger.info(f"Creating label '{name}' in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def create_org_label( + self, + org: str, + name: str, + color: str, + description: Optional[str] = None + ) -> Dict: + """ + Create a new label at the organization level. + + Organization labels are shared across all repositories in the org. + Use this for workflow labels (Type, Priority, Complexity, Effort, etc.) + + Args: + org: Organization name + name: Label name (e.g., 'Type/Bug', 'Priority/High') + color: Hex color code (with or without #) + description: Optional label description + + Returns: + Created label dictionary + """ + url = f"{self.base_url}/orgs/{org}/labels" + data = { + 'name': name, + 'color': color.lstrip('#') # Remove # if present + } + if description: + data['description'] = description + logger.info(f"Creating organization label '{name}' in {org}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + # ======================================== + # PULL REQUEST OPERATIONS + # ======================================== + + def list_pull_requests( + self, + state: str = 'open', + sort: str = 'recentupdate', + labels: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> List[Dict]: + """ + List pull requests from Gitea repository. + + Args: + state: PR state (open, closed, all) + sort: Sort order (oldest, recentupdate, leastupdate, mostcomment, leastcomment, priority) + labels: Filter by labels + repo: Repository in 'owner/repo' format + + Returns: + List of pull request dictionaries + """ + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls" + params = {'state': state, 'sort': sort} + if labels: + params['labels'] = ','.join(labels) + logger.info(f"Listing PRs from {owner}/{target_repo} with state={state}") + response = self.session.get(url, params=params) + response.raise_for_status() + return response.json() + + def get_pull_request( + self, + pr_number: int, + repo: Optional[str] = None + ) -> Dict: + """Get specific pull request details.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}" + logger.info(f"Getting PR #{pr_number} from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def get_pr_diff( + self, + pr_number: int, + repo: Optional[str] = None + ) -> str: + """Get the diff for a pull request.""" + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}.diff" + logger.info(f"Getting diff for PR #{pr_number} from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.text + + def get_pr_comments( + self, + pr_number: int, + repo: Optional[str] = None + ) -> List[Dict]: + """Get comments on a pull request (uses issue comments endpoint).""" + owner, target_repo = self._parse_repo(repo) + # PRs share comment endpoint with issues in Gitea + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{pr_number}/comments" + logger.info(f"Getting comments for PR #{pr_number} from {owner}/{target_repo}") + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def create_pr_review( + self, + pr_number: int, + body: str, + event: str = 'COMMENT', + comments: Optional[List[Dict]] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create a review on a pull request. + + Args: + pr_number: Pull request number + body: Review body/summary + event: Review action (APPROVE, REQUEST_CHANGES, COMMENT) + comments: Optional list of inline comments with path, position, body + repo: Repository in 'owner/repo' format + + Returns: + Created review dictionary + """ + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}/reviews" + data = { + 'body': body, + 'event': event + } + if comments: + data['comments'] = comments + logger.info(f"Creating review on PR #{pr_number} in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def add_pr_comment( + self, + pr_number: int, + body: str, + repo: Optional[str] = None + ) -> Dict: + """Add a general comment to a pull request (uses issue comment endpoint).""" + owner, target_repo = self._parse_repo(repo) + # PRs share comment endpoint with issues in Gitea + url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{pr_number}/comments" + data = {'body': body} + logger.info(f"Adding comment to PR #{pr_number} in {owner}/{target_repo}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() + + def create_pull_request( + self, + title: str, + body: str, + head: str, + base: str, + labels: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create a new pull request. + + Args: + title: PR title + body: PR description/body + head: Source branch name (the branch with changes) + base: Target branch name (the branch to merge into) + labels: Optional list of label names + repo: Repository in 'owner/repo' format + + Returns: + Created pull request dictionary + """ + owner, target_repo = self._parse_repo(repo) + url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls" + data = { + 'title': title, + 'body': body, + 'head': head, + 'base': base + } + if labels: + label_ids = self._resolve_label_ids(labels, owner, target_repo) + data['labels'] = label_ids + logger.info(f"Creating PR '{title}' in {owner}/{target_repo}: {head} -> {base}") + response = self.session.post(url, json=data) + response.raise_for_status() + return response.json() diff --git a/gitea_mcp/server.py b/gitea_mcp/server.py new file mode 100644 index 0000000..2d07417 --- /dev/null +++ b/gitea_mcp/server.py @@ -0,0 +1,93 @@ +""" +MCP Server entry point for Gitea integration. + +Provides Gitea tools to Claude Code via JSON-RPC 2.0 over stdio. +""" +import asyncio +import logging +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import Tool, TextContent + +from .config import GiteaConfig +from .gitea_client import GiteaClient +from .tool_registry import get_tool_definitions, create_tool_dispatcher + +# Suppress noisy MCP validation warnings on stderr +logging.basicConfig(level=logging.INFO) +logging.getLogger("root").setLevel(logging.ERROR) +logging.getLogger("mcp").setLevel(logging.ERROR) +logger = logging.getLogger(__name__) + + +class GiteaMCPServer: + """MCP Server for Gitea integration""" + + def __init__(self): + self.server = Server("gitea-mcp") + self.config = None + self.client = None + self._dispatcher = None + + async def initialize(self): + """ + Initialize server and load configuration. + + Raises: + Exception: If initialization fails + """ + try: + config_loader = GiteaConfig() + self.config = config_loader.load() + + self.client = GiteaClient() + self._dispatcher = create_tool_dispatcher(self.client) + + logger.info(f"Gitea MCP Server initialized in {self.config['mode']} mode") + except Exception as e: + logger.error(f"Failed to initialize: {e}") + raise + + def setup_tools(self): + """Register all available tools with the MCP server""" + + @self.server.list_tools() + async def list_tools() -> list[Tool]: + """Return list of available tools""" + return get_tool_definitions() + + @self.server.call_tool() + async def call_tool(name: str, arguments: dict) -> list[TextContent]: + """ + Handle tool invocation. + + Args: + name: Tool name + arguments: Tool arguments + + Returns: + List of TextContent with results + """ + return await self._dispatcher(name, arguments) + + async def run(self): + """Run the MCP server""" + await self.initialize() + self.setup_tools() + + async with stdio_server() as (read_stream, write_stream): + await self.server.run( + read_stream, + write_stream, + self.server.create_initialization_options() + ) + + +async def main(): + """Main entry point""" + server = GiteaMCPServer() + await server.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/gitea_mcp/tool_registry.py b/gitea_mcp/tool_registry.py new file mode 100644 index 0000000..b2f39c4 --- /dev/null +++ b/gitea_mcp/tool_registry.py @@ -0,0 +1,1098 @@ +""" +Tool registry for Gitea MCP Server. + +Provides transport-agnostic tool definitions and dispatch logic that can be +used by both the stdio server and external consumers (e.g., HTTP transport). + +Usage: + from gitea_mcp.tool_registry import get_tool_definitions, create_tool_dispatcher + + # Get all tool schemas + tools = get_tool_definitions() + + # Get filtered tool schemas + tools = get_tool_definitions(tool_filter=["list_issues", "get_issue"]) + + # Create dispatcher + dispatch = create_tool_dispatcher(client) + result = await dispatch("list_issues", {"state": "open"}) +""" +import json +import logging +from typing import Callable, Awaitable, Optional + +from mcp.types import Tool, TextContent + +from .gitea_client import GiteaClient +from .tools.issues import IssueTools +from .tools.labels import LabelTools +from .tools.wiki import WikiTools +from .tools.milestones import MilestoneTools +from .tools.dependencies import DependencyTools +from .tools.pull_requests import PullRequestTools + +logger = logging.getLogger(__name__) + + +def _coerce_types(arguments: dict) -> dict: + """ + Coerce argument types to handle MCP serialization quirks. + + MCP sometimes passes integers as strings and arrays as JSON strings. + This function normalizes them to the expected Python types. + """ + coerced = {} + for key, value in arguments.items(): + if value is None: + coerced[key] = value + continue + + # Coerce integer fields + int_fields = {'issue_number', 'milestone_id', 'pr_number', 'depends_on', 'milestone', 'limit'} + if key in int_fields and isinstance(value, str): + try: + coerced[key] = int(value) + continue + except ValueError: + pass + + # Coerce array fields that might be JSON strings + array_fields = {'labels', 'tags', 'issue_numbers', 'comments'} + if key in array_fields and isinstance(value, str): + try: + parsed = json.loads(value) + if isinstance(parsed, list): + coerced[key] = parsed + continue + except json.JSONDecodeError: + pass + + coerced[key] = value + + return coerced + + +def _get_all_tool_definitions() -> list[Tool]: + """ + Return the complete list of Tool definitions. + + This is the single source of truth for all tool schemas. + """ + return [ + Tool( + name="list_issues", + description="List issues from Gitea repository", + inputSchema={ + "type": "object", + "properties": { + "state": { + "type": "string", + "enum": ["open", "closed", "all"], + "default": "open", + "description": "Issue state filter" + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by labels" + }, + "milestone": { + "type": "string", + "description": "Filter by milestone title (exact match)" + }, + "repo": { + "type": "string", + "description": "Repository name (for PMO mode)" + } + } + } + ), + Tool( + name="get_issue", + description="Get specific issue details", + inputSchema={ + "type": "object", + "properties": { + "issue_number": { + "type": ["integer", "string"], + "description": "Issue number" + }, + "repo": { + "type": "string", + "description": "Repository name (for PMO mode)" + } + }, + "required": ["issue_number"] + } + ), + Tool( + name="create_issue", + description="Create a new issue in Gitea", + inputSchema={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "Issue title" + }, + "body": { + "type": "string", + "description": "Issue description" + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "List of label names" + }, + "repo": { + "type": "string", + "description": "Repository name (for PMO mode)" + } + }, + "required": ["title", "body"] + } + ), + Tool( + name="update_issue", + description="Update existing issue", + inputSchema={ + "type": "object", + "properties": { + "issue_number": { + "type": ["integer", "string"], + "description": "Issue number" + }, + "title": { + "type": "string", + "description": "New title" + }, + "body": { + "type": "string", + "description": "New body" + }, + "state": { + "type": "string", + "enum": ["open", "closed"], + "description": "New state" + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "New labels" + }, + "milestone": { + "type": ["integer", "string"], + "description": "Milestone ID to assign" + }, + "repo": { + "type": "string", + "description": "Repository name (for PMO mode)" + } + }, + "required": ["issue_number"] + } + ), + Tool( + name="add_comment", + description="Add comment to issue", + inputSchema={ + "type": "object", + "properties": { + "issue_number": { + "type": ["integer", "string"], + "description": "Issue number" + }, + "comment": { + "type": "string", + "description": "Comment text" + }, + "repo": { + "type": "string", + "description": "Repository name (for PMO mode)" + } + }, + "required": ["issue_number", "comment"] + } + ), + Tool( + name="get_labels", + description="Get all available labels (org + repo)", + inputSchema={ + "type": "object", + "properties": { + "repo": { + "type": "string", + "description": "Repository name (for PMO mode)" + } + } + } + ), + Tool( + name="suggest_labels", + description="Analyze context and suggest appropriate labels", + inputSchema={ + "type": "object", + "properties": { + "context": { + "type": "string", + "description": "Issue title + description or sprint context" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["context"] + } + ), + Tool( + name="aggregate_issues", + description="Fetch issues across all repositories (PMO mode)", + inputSchema={ + "type": "object", + "properties": { + "org": { + "type": "string", + "description": "Organization name (e.g. 'bandit')" + }, + "state": { + "type": "string", + "enum": ["open", "closed", "all"], + "default": "open", + "description": "Issue state filter" + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by labels" + } + }, + "required": ["org"] + } + ), + # Wiki Tools (Lessons Learned) + Tool( + name="list_wiki_pages", + description="List all wiki pages in repository", + inputSchema={ + "type": "object", + "properties": { + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + } + } + ), + Tool( + name="get_wiki_page", + description="Get a specific wiki page by name", + inputSchema={ + "type": "object", + "properties": { + "page_name": { + "type": "string", + "description": "Wiki page name/path" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["page_name"] + } + ), + Tool( + name="create_wiki_page", + description="Create a new wiki page", + inputSchema={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "Page title/name" + }, + "content": { + "type": "string", + "description": "Page content (markdown)" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["title", "content"] + } + ), + Tool( + name="update_wiki_page", + description="Update an existing wiki page", + inputSchema={ + "type": "object", + "properties": { + "page_name": { + "type": "string", + "description": "Wiki page name/path" + }, + "content": { + "type": "string", + "description": "New page content (markdown)" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["page_name", "content"] + } + ), + Tool( + name="create_lesson", + description="Create a lessons learned entry in the wiki", + inputSchema={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "Lesson title (e.g., 'Sprint 16 - Prevent Infinite Loops')" + }, + "content": { + "type": "string", + "description": "Lesson content (markdown with context, problem, solution, prevention)" + }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "Tags for categorization" + }, + "category": { + "type": "string", + "default": "sprints", + "description": "Category (sprints, patterns, architecture, etc.)" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["title", "content", "tags"] + } + ), + Tool( + name="search_lessons", + description="Search lessons learned from previous sprints", + inputSchema={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query (optional)" + }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "Tags to filter by (optional)" + }, + "limit": { + "type": ["integer", "string"], + "default": 20, + "description": "Maximum results" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + } + } + ), + Tool( + name="allocate_rfc_number", + description="Allocate the next available RFC number by scanning existing RFC wiki pages", + inputSchema={ + "type": "object", + "properties": { + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + } + } + ), + # Milestone Tools + Tool( + name="list_milestones", + description="List all milestones in repository", + inputSchema={ + "type": "object", + "properties": { + "state": { + "type": "string", + "enum": ["open", "closed", "all"], + "default": "open", + "description": "Milestone state filter" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + } + } + ), + Tool( + name="get_milestone", + description="Get a specific milestone by ID", + inputSchema={ + "type": "object", + "properties": { + "milestone_id": { + "type": ["integer", "string"], + "description": "Milestone ID" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["milestone_id"] + } + ), + Tool( + name="create_milestone", + description="Create a new milestone", + inputSchema={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "Milestone title" + }, + "description": { + "type": "string", + "description": "Milestone description" + }, + "due_on": { + "type": "string", + "description": "Due date (ISO 8601 format)" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["title"] + } + ), + Tool( + name="update_milestone", + description="Update an existing milestone", + inputSchema={ + "type": "object", + "properties": { + "milestone_id": { + "type": ["integer", "string"], + "description": "Milestone ID" + }, + "title": { + "type": "string", + "description": "New title" + }, + "description": { + "type": "string", + "description": "New description" + }, + "state": { + "type": "string", + "enum": ["open", "closed"], + "description": "New state" + }, + "due_on": { + "type": "string", + "description": "New due date" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["milestone_id"] + } + ), + Tool( + name="delete_milestone", + description="Delete a milestone", + inputSchema={ + "type": "object", + "properties": { + "milestone_id": { + "type": ["integer", "string"], + "description": "Milestone ID" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["milestone_id"] + } + ), + # Dependency Tools + Tool( + name="list_issue_dependencies", + description="List all dependencies for an issue (issues that block this one)", + inputSchema={ + "type": "object", + "properties": { + "issue_number": { + "type": ["integer", "string"], + "description": "Issue number" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["issue_number"] + } + ), + Tool( + name="create_issue_dependency", + description="Create a dependency (issue depends on another issue)", + inputSchema={ + "type": "object", + "properties": { + "issue_number": { + "type": ["integer", "string"], + "description": "Issue that will depend on another" + }, + "depends_on": { + "type": ["integer", "string"], + "description": "Issue that blocks issue_number" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["issue_number", "depends_on"] + } + ), + Tool( + name="remove_issue_dependency", + description="Remove a dependency between issues", + inputSchema={ + "type": "object", + "properties": { + "issue_number": { + "type": ["integer", "string"], + "description": "Issue that depends on another" + }, + "depends_on": { + "type": ["integer", "string"], + "description": "Issue being depended on" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["issue_number", "depends_on"] + } + ), + Tool( + name="get_execution_order", + description="Get parallelizable execution order for issues based on dependencies", + inputSchema={ + "type": "object", + "properties": { + "issue_numbers": { + "type": "array", + "items": {"type": "integer"}, + "description": "List of issue numbers to analyze" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["issue_numbers"] + } + ), + # Validation Tools + Tool( + name="validate_repo_org", + description="Check if repository belongs to an organization", + inputSchema={ + "type": "object", + "properties": { + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + } + } + ), + Tool( + name="get_branch_protection", + description="Get branch protection rules", + inputSchema={ + "type": "object", + "properties": { + "branch": { + "type": "string", + "description": "Branch name" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["branch"] + } + ), + Tool( + name="create_label", + description="Create a new label in the repository (for repo-specific labels like Component/*, Tech/*)", + inputSchema={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Label name (e.g., 'Component/Backend', 'Tech/Python')" + }, + "color": { + "type": "string", + "description": "Label color (hex code)" + }, + "description": { + "type": "string", + "description": "Label description" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["name", "color"] + } + ), + Tool( + name="create_org_label", + description="Create a new label at organization level (for workflow labels like Type/*, Priority/*, Complexity/*, Effort/*)", + inputSchema={ + "type": "object", + "properties": { + "org": { + "type": "string", + "description": "Organization name" + }, + "name": { + "type": "string", + "description": "Label name (e.g., 'Type/Bug', 'Priority/High')" + }, + "color": { + "type": "string", + "description": "Label color (hex code)" + }, + "description": { + "type": "string", + "description": "Label description" + } + }, + "required": ["org", "name", "color"] + } + ), + Tool( + name="create_label_smart", + description="Create a label at the appropriate level (org or repo) based on category. Org: Type/*, Priority/*, Complexity/*, Effort/*, Risk/*, Source/*, Agent/*. Repo: Component/*, Tech/*", + inputSchema={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Label name (e.g., 'Type/Bug', 'Component/Backend')" + }, + "color": { + "type": "string", + "description": "Label color (hex code)" + }, + "description": { + "type": "string", + "description": "Label description" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["name", "color"] + } + ), + # Pull Request Tools + Tool( + name="list_pull_requests", + description="List pull requests from repository", + inputSchema={ + "type": "object", + "properties": { + "state": { + "type": "string", + "enum": ["open", "closed", "all"], + "default": "open", + "description": "PR state filter" + }, + "sort": { + "type": "string", + "enum": ["oldest", "recentupdate", "leastupdate", "mostcomment", "leastcomment", "priority"], + "default": "recentupdate", + "description": "Sort order" + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by labels" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + } + } + ), + Tool( + name="get_pull_request", + description="Get specific pull request details", + inputSchema={ + "type": "object", + "properties": { + "pr_number": { + "type": ["integer", "string"], + "description": "Pull request number" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["pr_number"] + } + ), + Tool( + name="get_pr_diff", + description="Get the diff for a pull request", + inputSchema={ + "type": "object", + "properties": { + "pr_number": { + "type": ["integer", "string"], + "description": "Pull request number" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["pr_number"] + } + ), + Tool( + name="get_pr_comments", + description="Get comments on a pull request", + inputSchema={ + "type": "object", + "properties": { + "pr_number": { + "type": ["integer", "string"], + "description": "Pull request number" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["pr_number"] + } + ), + Tool( + name="create_pr_review", + description="Create a review on a pull request (approve, request changes, or comment)", + inputSchema={ + "type": "object", + "properties": { + "pr_number": { + "type": ["integer", "string"], + "description": "Pull request number" + }, + "body": { + "type": "string", + "description": "Review body/summary" + }, + "event": { + "type": "string", + "enum": ["APPROVE", "REQUEST_CHANGES", "COMMENT"], + "default": "COMMENT", + "description": "Review action" + }, + "comments": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "position": {"type": ["integer", "string"]}, + "body": {"type": "string"} + } + }, + "description": "Optional inline comments" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["pr_number", "body"] + } + ), + Tool( + name="add_pr_comment", + description="Add a general comment to a pull request", + inputSchema={ + "type": "object", + "properties": { + "pr_number": { + "type": ["integer", "string"], + "description": "Pull request number" + }, + "body": { + "type": "string", + "description": "Comment text" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["pr_number", "body"] + } + ), + Tool( + name="create_pull_request", + description="Create a new pull request", + inputSchema={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "PR title" + }, + "body": { + "type": "string", + "description": "PR description/body" + }, + "head": { + "type": "string", + "description": "Source branch name (the branch with changes)" + }, + "base": { + "type": "string", + "description": "Target branch name (the branch to merge into)" + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "Optional list of label names" + }, + "repo": { + "type": "string", + "description": "Repository name (owner/repo format)" + } + }, + "required": ["title", "body", "head", "base"] + } + ) + ] + + +def get_tool_definitions(tool_filter: Optional[list[str]] = None) -> list[Tool]: + """ + Get tool definitions, optionally filtered by name. + + Args: + tool_filter: Optional list of tool names to include. If None, returns all tools. + + Returns: + List of Tool objects + """ + all_tools = _get_all_tool_definitions() + + if tool_filter is None: + return all_tools + + filter_set = set(tool_filter) + return [tool for tool in all_tools if tool.name in filter_set] + + +def create_tool_dispatcher( + client: GiteaClient, + tool_filter: Optional[list[str]] = None +) -> Callable[[str, dict], Awaitable[list[TextContent]]]: + """ + Create a tool dispatcher function bound to the given client. + + Args: + client: GiteaClient instance + tool_filter: Optional list of tool names to allow. If None, all tools are allowed. + + Returns: + Async function that dispatches tool calls: dispatch(name, arguments) -> list[TextContent] + """ + # Initialize tool handlers + issue_tools = IssueTools(client) + label_tools = LabelTools(client) + wiki_tools = WikiTools(client) + milestone_tools = MilestoneTools(client) + dependency_tools = DependencyTools(client) + pr_tools = PullRequestTools(client) + + # Build filter set if provided + filter_set = set(tool_filter) if tool_filter else None + + async def dispatch(name: str, arguments: dict) -> list[TextContent]: + """ + Dispatch a tool call to the appropriate handler. + + Args: + name: Tool name + arguments: Tool arguments + + Returns: + List of TextContent with results + """ + try: + # Check filter if provided + if filter_set and name not in filter_set: + raise ValueError(f"Tool not available: {name}") + + # Coerce types to handle MCP serialization quirks + arguments = _coerce_types(arguments) + + # Route to appropriate tool handler + if name == "list_issues": + result = await issue_tools.list_issues(**arguments) + elif name == "get_issue": + result = await issue_tools.get_issue(**arguments) + elif name == "create_issue": + result = await issue_tools.create_issue(**arguments) + elif name == "update_issue": + result = await issue_tools.update_issue(**arguments) + elif name == "add_comment": + result = await issue_tools.add_comment(**arguments) + elif name == "get_labels": + result = await label_tools.get_labels(**arguments) + elif name == "suggest_labels": + result = await label_tools.suggest_labels(**arguments) + elif name == "aggregate_issues": + result = await issue_tools.aggregate_issues(**arguments) + # Wiki tools + elif name == "list_wiki_pages": + result = await wiki_tools.list_wiki_pages(**arguments) + elif name == "get_wiki_page": + result = await wiki_tools.get_wiki_page(**arguments) + elif name == "create_wiki_page": + result = await wiki_tools.create_wiki_page(**arguments) + elif name == "update_wiki_page": + result = await wiki_tools.update_wiki_page(**arguments) + elif name == "create_lesson": + result = await wiki_tools.create_lesson(**arguments) + elif name == "search_lessons": + tags = arguments.get('tags') + result = await wiki_tools.search_lessons( + query=arguments.get('query'), + tags=tags, + limit=arguments.get('limit', 20), + repo=arguments.get('repo') + ) + elif name == "allocate_rfc_number": + result = await wiki_tools.allocate_rfc_number( + repo=arguments.get('repo') + ) + # Milestone tools + elif name == "list_milestones": + result = await milestone_tools.list_milestones(**arguments) + elif name == "get_milestone": + result = await milestone_tools.get_milestone(**arguments) + elif name == "create_milestone": + result = await milestone_tools.create_milestone(**arguments) + elif name == "update_milestone": + result = await milestone_tools.update_milestone(**arguments) + elif name == "delete_milestone": + result = await milestone_tools.delete_milestone(**arguments) + # Dependency tools + elif name == "list_issue_dependencies": + result = await dependency_tools.list_issue_dependencies(**arguments) + elif name == "create_issue_dependency": + result = await dependency_tools.create_issue_dependency(**arguments) + elif name == "remove_issue_dependency": + result = await dependency_tools.remove_issue_dependency(**arguments) + elif name == "get_execution_order": + result = await dependency_tools.get_execution_order(**arguments) + # Validation tools + elif name == "validate_repo_org": + is_org = client.is_org_repo(arguments.get('repo')) + result = {'is_organization': is_org} + elif name == "get_branch_protection": + result = client.get_branch_protection( + arguments['branch'], + arguments.get('repo') + ) + elif name == "create_label": + result = client.create_label( + arguments['name'], + arguments['color'], + arguments.get('description'), + arguments.get('repo') + ) + elif name == "create_org_label": + result = client.create_org_label( + arguments['org'], + arguments['name'], + arguments['color'], + arguments.get('description') + ) + elif name == "create_label_smart": + result = await label_tools.create_label_smart( + arguments['name'], + arguments['color'], + arguments.get('description'), + arguments.get('repo') + ) + # Pull Request tools + elif name == "list_pull_requests": + result = await pr_tools.list_pull_requests(**arguments) + elif name == "get_pull_request": + result = await pr_tools.get_pull_request(**arguments) + elif name == "get_pr_diff": + result = await pr_tools.get_pr_diff(**arguments) + elif name == "get_pr_comments": + result = await pr_tools.get_pr_comments(**arguments) + elif name == "create_pr_review": + result = await pr_tools.create_pr_review(**arguments) + elif name == "add_pr_comment": + result = await pr_tools.add_pr_comment(**arguments) + elif name == "create_pull_request": + result = await pr_tools.create_pull_request(**arguments) + else: + raise ValueError(f"Unknown tool: {name}") + + return [TextContent( + type="text", + text=json.dumps(result, indent=2) + )] + + except Exception as e: + logger.error(f"Tool {name} failed: {e}") + return [TextContent( + type="text", + text=f"Error: {str(e)}" + )] + + return dispatch diff --git a/gitea_mcp/tools/__init__.py b/gitea_mcp/tools/__init__.py new file mode 100644 index 0000000..59b89aa --- /dev/null +++ b/gitea_mcp/tools/__init__.py @@ -0,0 +1,11 @@ +""" +MCP tools for Gitea integration. + +This package provides MCP tool implementations for: +- Issue operations (issues.py) +- Label management (labels.py) +- Wiki operations (wiki.py) +- Milestone management (milestones.py) +- Issue dependencies (dependencies.py) +- Pull request operations (pull_requests.py) +""" diff --git a/gitea_mcp/tools/dependencies.py b/gitea_mcp/tools/dependencies.py new file mode 100644 index 0000000..6378b45 --- /dev/null +++ b/gitea_mcp/tools/dependencies.py @@ -0,0 +1,216 @@ +""" +Issue dependency management tools for MCP server. + +Provides async wrappers for issue dependency operations: +- List/create/remove dependencies +- Build dependency graphs for parallel execution +""" +import asyncio +import logging +from typing import List, Dict, Optional, Set, Tuple + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class DependencyTools: + """Async wrappers for Gitea issue dependency operations""" + + def __init__(self, gitea_client): + """ + Initialize dependency tools. + + Args: + gitea_client: GiteaClient instance + """ + self.gitea = gitea_client + + async def list_issue_dependencies( + self, + issue_number: int, + repo: Optional[str] = None + ) -> List[Dict]: + """ + List all dependencies for an issue (issues that block this one). + + Args: + issue_number: Issue number + repo: Repository in owner/repo format + + Returns: + List of issues that this issue depends on + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.list_issue_dependencies(issue_number, repo) + ) + + async def create_issue_dependency( + self, + issue_number: int, + depends_on: int, + repo: Optional[str] = None + ) -> Dict: + """ + Create a dependency between issues. + + Args: + issue_number: The issue that will depend on another + depends_on: The issue that blocks issue_number + repo: Repository in owner/repo format + + Returns: + Created dependency information + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_issue_dependency(issue_number, depends_on, repo) + ) + + async def remove_issue_dependency( + self, + issue_number: int, + depends_on: int, + repo: Optional[str] = None + ) -> bool: + """ + Remove a dependency between issues. + + Args: + issue_number: The issue that currently depends on another + depends_on: The issue being depended on + repo: Repository in owner/repo format + + Returns: + True if removed successfully + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.remove_issue_dependency(issue_number, depends_on, repo) + ) + + async def list_issue_blocks( + self, + issue_number: int, + repo: Optional[str] = None + ) -> List[Dict]: + """ + List all issues that this issue blocks. + + Args: + issue_number: Issue number + repo: Repository in owner/repo format + + Returns: + List of issues blocked by this issue + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.list_issue_blocks(issue_number, repo) + ) + + async def build_dependency_graph( + self, + issue_numbers: List[int], + repo: Optional[str] = None + ) -> Dict[int, List[int]]: + """ + Build a dependency graph for a list of issues. + + Args: + issue_numbers: List of issue numbers to analyze + repo: Repository in owner/repo format + + Returns: + Dictionary mapping issue_number -> list of issues it depends on + """ + graph = {} + for issue_num in issue_numbers: + try: + deps = await self.list_issue_dependencies(issue_num, repo) + graph[issue_num] = [ + d.get('number') or d.get('index') + for d in deps + if (d.get('number') or d.get('index')) in issue_numbers + ] + except Exception as e: + logger.warning(f"Could not fetch dependencies for #{issue_num}: {e}") + graph[issue_num] = [] + return graph + + async def get_ready_tasks( + self, + issue_numbers: List[int], + completed: Set[int], + repo: Optional[str] = None + ) -> List[int]: + """ + Get tasks that are ready to execute (no unresolved dependencies). + + Args: + issue_numbers: List of all issue numbers in sprint + completed: Set of already completed issue numbers + repo: Repository in owner/repo format + + Returns: + List of issue numbers that can be executed now + """ + graph = await self.build_dependency_graph(issue_numbers, repo) + ready = [] + + for issue_num in issue_numbers: + if issue_num in completed: + continue + + deps = graph.get(issue_num, []) + # Task is ready if all its dependencies are completed + if all(dep in completed for dep in deps): + ready.append(issue_num) + + return ready + + async def get_execution_order( + self, + issue_numbers: List[int], + repo: Optional[str] = None + ) -> List[List[int]]: + """ + Get a parallelizable execution order for issues. + + Returns batches of issues that can be executed in parallel. + Each batch contains issues with no unresolved dependencies. + + Args: + issue_numbers: List of all issue numbers + repo: Repository in owner/repo format + + Returns: + List of batches, where each batch can be executed in parallel + """ + graph = await self.build_dependency_graph(issue_numbers, repo) + completed: Set[int] = set() + remaining = set(issue_numbers) + batches = [] + + while remaining: + # Find all tasks with no unresolved dependencies + batch = [] + for issue_num in remaining: + deps = graph.get(issue_num, []) + if all(dep in completed for dep in deps): + batch.append(issue_num) + + if not batch: + # Circular dependency detected + logger.error(f"Circular dependency detected! Remaining: {remaining}") + batch = list(remaining) # Force include remaining to avoid infinite loop + + batches.append(batch) + completed.update(batch) + remaining -= set(batch) + + return batches diff --git a/gitea_mcp/tools/issues.py b/gitea_mcp/tools/issues.py new file mode 100644 index 0000000..04e63cf --- /dev/null +++ b/gitea_mcp/tools/issues.py @@ -0,0 +1,287 @@ +""" +Issue management tools for MCP server. + +Provides async wrappers for issue CRUD operations with: +- Branch-aware security +- PMO multi-repo support +- Comprehensive error handling +""" +import asyncio +import os +import subprocess +import logging +from typing import List, Dict, Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class IssueTools: + """Async wrappers for Gitea issue operations with branch detection""" + + def __init__(self, gitea_client): + """ + Initialize issue tools. + + Args: + gitea_client: GiteaClient instance + """ + self.gitea = gitea_client + + def _get_project_directory(self) -> Optional[str]: + """ + Get the user's project directory from environment. + + Returns: + Project directory path or None if not set + """ + return os.environ.get('CLAUDE_PROJECT_DIR') + + def _get_current_branch(self) -> str: + """ + Get current git branch from user's project directory. + + Uses CLAUDE_PROJECT_DIR environment variable to determine the correct + directory for git operations, avoiding the bug where git runs from + the installed plugin directory instead of the user's project. + + Returns: + Current branch name or 'unknown' if not in a git repo + """ + try: + project_dir = self._get_project_directory() + result = subprocess.run( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + capture_output=True, + text=True, + check=True, + cwd=project_dir # Run git in project directory, not plugin directory + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + return "unknown" + + def _check_branch_permissions(self, operation: str) -> bool: + """ + Check if operation is allowed on current branch. + + Args: + operation: Operation name (list_issues, create_issue, etc.) + + Returns: + True if operation is allowed, False otherwise + """ + branch = self._get_current_branch() + + # Production branches (read-only except incidents) + if branch in ['main', 'master'] or branch.startswith('prod/'): + return operation in ['list_issues', 'get_issue', 'get_labels'] + + # Staging branches (read-only for code) + if branch == 'staging' or branch.startswith('stage/'): + return operation in ['list_issues', 'get_issue', 'get_labels', 'create_issue'] + + # Development branches (full access) + # Include all common feature/fix branch patterns + dev_prefixes = ( + 'feat/', 'feature/', 'dev/', + 'fix/', 'bugfix/', 'hotfix/', + 'chore/', 'refactor/', 'docs/', 'test/' + ) + if branch in ['development', 'develop'] or branch.startswith(dev_prefixes): + return True + + # Unknown branch - be restrictive + return False + + async def list_issues( + self, + state: str = 'open', + labels: Optional[List[str]] = None, + milestone: Optional[str] = None, + repo: Optional[str] = None + ) -> List[Dict]: + """ + List issues from repository (async wrapper). + + Args: + state: Issue state (open, closed, all) + labels: Filter by labels + milestone: Filter by milestone title (exact match) + repo: Override configured repo (for PMO multi-repo) + + Returns: + List of issue dictionaries + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('list_issues'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot list issues on branch '{branch}'. " + f"Switch to a development branch." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.list_issues(state, labels, milestone, repo) + ) + + async def get_issue( + self, + issue_number: int, + repo: Optional[str] = None + ) -> Dict: + """ + Get specific issue details (async wrapper). + + Args: + issue_number: Issue number + repo: Override configured repo (for PMO multi-repo) + + Returns: + Issue dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('get_issue'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot get issue on branch '{branch}'. " + f"Switch to a development branch." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.get_issue(issue_number, repo) + ) + + async def create_issue( + self, + title: str, + body: str, + labels: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create new issue (async wrapper with branch check). + + Args: + title: Issue title + body: Issue description + labels: List of label names + repo: Override configured repo (for PMO multi-repo) + + Returns: + Created issue dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('create_issue'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot create issues on branch '{branch}'. " + f"Switch to a development branch to create issues." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_issue(title, body, labels, repo) + ) + + async def update_issue( + self, + issue_number: int, + title: Optional[str] = None, + body: Optional[str] = None, + state: Optional[str] = None, + labels: Optional[List[str]] = None, + milestone: Optional[int] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Update existing issue (async wrapper with branch check). + + Args: + issue_number: Issue number + title: New title (optional) + body: New body (optional) + state: New state - 'open' or 'closed' (optional) + labels: New labels (optional) + milestone: Milestone ID to assign (optional) + repo: Override configured repo (for PMO multi-repo) + + Returns: + Updated issue dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('update_issue'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot update issues on branch '{branch}'. " + f"Switch to a development branch to update issues." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.update_issue(issue_number, title, body, state, labels, milestone, repo) + ) + + async def add_comment( + self, + issue_number: int, + comment: str, + repo: Optional[str] = None + ) -> Dict: + """ + Add comment to issue (async wrapper with branch check). + + Args: + issue_number: Issue number + comment: Comment text + repo: Override configured repo (for PMO multi-repo) + + Returns: + Created comment dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('add_comment'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot add comments on branch '{branch}'. " + f"Switch to a development branch to add comments." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.add_comment(issue_number, comment, repo) + ) + + async def aggregate_issues( + self, + org: str, + state: str = 'open', + labels: Optional[List[str]] = None + ) -> Dict[str, List[Dict]]: + """Aggregate issues across all repositories in org.""" + if not self._check_branch_permissions('aggregate_issues'): + branch = self._get_current_branch() + raise PermissionError(f"Cannot aggregate issues on branch '{branch}'.") + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.aggregate_issues(org, state, labels) + ) diff --git a/gitea_mcp/tools/labels.py b/gitea_mcp/tools/labels.py new file mode 100644 index 0000000..c5859da --- /dev/null +++ b/gitea_mcp/tools/labels.py @@ -0,0 +1,377 @@ +""" +Label management tools for MCP server. + +Provides async wrappers for label operations with: +- Label taxonomy retrieval +- Intelligent label suggestion +- Dynamic label detection +""" +import asyncio +import logging +import re +from typing import List, Dict, Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class LabelTools: + """Async wrappers for Gitea label operations""" + + def __init__(self, gitea_client): + """ + Initialize label tools. + + Args: + gitea_client: GiteaClient instance + """ + self.gitea = gitea_client + + async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]: + """Get all labels (org + repo if org-owned, repo-only if user-owned).""" + loop = asyncio.get_event_loop() + + target_repo = repo or self.gitea.repo + if not target_repo or '/' not in target_repo: + raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')") + + # Check if repo belongs to an organization or user + is_org = await loop.run_in_executor( + None, + lambda: self.gitea.is_org_repo(target_repo) + ) + + org_labels = [] + if is_org: + org = target_repo.split('/')[0] + org_labels = await loop.run_in_executor( + None, + lambda: self.gitea.get_org_labels(org) + ) + + repo_labels = await loop.run_in_executor( + None, + lambda: self.gitea.get_labels(target_repo) + ) + + return { + 'organization': org_labels, + 'repository': repo_labels, + 'total_count': len(org_labels) + len(repo_labels) + } + + async def suggest_labels(self, context: str, repo: Optional[str] = None) -> List[str]: + """ + Analyze context and suggest appropriate labels from repository's actual labels. + + This method fetches actual labels from the repository and matches them + dynamically, supporting any label naming convention (slash, colon-space, etc.). + + Args: + context: Issue title + description or sprint context + repo: Repository in 'owner/repo' format (optional, uses default if not provided) + + Returns: + List of suggested label names that exist in the repository + """ + # Fetch actual labels from repository + target_repo = repo or self.gitea.repo + if not target_repo: + logger.warning("No repository specified, returning empty suggestions") + return [] + + try: + labels_data = await self.get_labels(target_repo) + all_labels = labels_data.get('organization', []) + labels_data.get('repository', []) + label_names = [label['name'] for label in all_labels] + except Exception as e: + logger.warning(f"Failed to fetch labels: {e}. Using fallback suggestions.") + label_names = [] + + # Build label lookup for dynamic matching + label_lookup = self._build_label_lookup(label_names) + + suggested = [] + context_lower = context.lower() + + # Type detection (exclusive - only one) + type_label = None + if any(word in context_lower for word in ['bug', 'error', 'fix', 'broken', 'crash', 'fail']): + type_label = self._find_label(label_lookup, 'type', 'bug') + elif any(word in context_lower for word in ['refactor', 'extract', 'restructure', 'architecture', 'service extraction']): + type_label = self._find_label(label_lookup, 'type', 'refactor') + elif any(word in context_lower for word in ['feature', 'add', 'implement', 'new', 'create']): + type_label = self._find_label(label_lookup, 'type', 'feature') + elif any(word in context_lower for word in ['docs', 'documentation', 'readme', 'guide']): + type_label = self._find_label(label_lookup, 'type', 'documentation') + elif any(word in context_lower for word in ['test', 'testing', 'spec', 'coverage']): + type_label = self._find_label(label_lookup, 'type', 'test') + elif any(word in context_lower for word in ['chore', 'maintenance', 'update', 'upgrade']): + type_label = self._find_label(label_lookup, 'type', 'chore') + if type_label: + suggested.append(type_label) + + # Priority detection + priority_label = None + if any(word in context_lower for word in ['critical', 'urgent', 'blocker', 'blocking', 'emergency']): + priority_label = self._find_label(label_lookup, 'priority', 'critical') + elif any(word in context_lower for word in ['high', 'important', 'asap', 'soon']): + priority_label = self._find_label(label_lookup, 'priority', 'high') + elif any(word in context_lower for word in ['low', 'nice-to-have', 'optional', 'later']): + priority_label = self._find_label(label_lookup, 'priority', 'low') + else: + priority_label = self._find_label(label_lookup, 'priority', 'medium') + if priority_label: + suggested.append(priority_label) + + # Complexity detection + complexity_label = None + if any(word in context_lower for word in ['simple', 'trivial', 'easy', 'quick']): + complexity_label = self._find_label(label_lookup, 'complexity', 'simple') + elif any(word in context_lower for word in ['complex', 'difficult', 'challenging', 'intricate']): + complexity_label = self._find_label(label_lookup, 'complexity', 'complex') + else: + complexity_label = self._find_label(label_lookup, 'complexity', 'medium') + if complexity_label: + suggested.append(complexity_label) + + # Effort detection (supports both "Effort" and "Efforts" naming) + effort_label = None + if any(word in context_lower for word in ['xs', 'tiny', '1 hour', '2 hours']): + effort_label = self._find_label(label_lookup, 'effort', 'xs') + elif any(word in context_lower for word in ['small', 's ', '1 day', 'half day']): + effort_label = self._find_label(label_lookup, 'effort', 's') + elif any(word in context_lower for word in ['medium', 'm ', '2 days', '3 days']): + effort_label = self._find_label(label_lookup, 'effort', 'm') + elif any(word in context_lower for word in ['large', 'l ', '1 week', '5 days']): + effort_label = self._find_label(label_lookup, 'effort', 'l') + elif any(word in context_lower for word in ['xl', 'extra large', '2 weeks', 'sprint']): + effort_label = self._find_label(label_lookup, 'effort', 'xl') + if effort_label: + suggested.append(effort_label) + + # Component detection (based on keywords) + component_mappings = { + 'backend': ['backend', 'server', 'api', 'database', 'service'], + 'frontend': ['frontend', 'ui', 'interface', 'react', 'vue', 'component'], + 'api': ['api', 'endpoint', 'rest', 'graphql', 'route'], + 'database': ['database', 'db', 'sql', 'migration', 'schema', 'postgres'], + 'auth': ['auth', 'authentication', 'login', 'oauth', 'token', 'session'], + 'deploy': ['deploy', 'deployment', 'docker', 'kubernetes', 'ci/cd'], + 'testing': ['test', 'testing', 'spec', 'jest', 'pytest', 'coverage'], + 'docs': ['docs', 'documentation', 'readme', 'guide', 'wiki'] + } + + for component, keywords in component_mappings.items(): + if any(keyword in context_lower for keyword in keywords): + label = self._find_label(label_lookup, 'component', component) + if label and label not in suggested: + suggested.append(label) + + # Tech stack detection + tech_mappings = { + 'python': ['python', 'fastapi', 'django', 'flask', 'pytest'], + 'javascript': ['javascript', 'js', 'node', 'npm', 'yarn'], + 'docker': ['docker', 'dockerfile', 'container', 'compose'], + 'postgresql': ['postgres', 'postgresql', 'psql', 'sql'], + 'redis': ['redis', 'cache', 'session store'], + 'vue': ['vue', 'vuejs', 'nuxt'], + 'fastapi': ['fastapi', 'pydantic', 'starlette'] + } + + for tech, keywords in tech_mappings.items(): + if any(keyword in context_lower for keyword in keywords): + label = self._find_label(label_lookup, 'tech', tech) + if label and label not in suggested: + suggested.append(label) + + # Source detection (based on git branch or context) + source_label = None + if 'development' in context_lower or 'dev/' in context_lower: + source_label = self._find_label(label_lookup, 'source', 'development') + elif 'staging' in context_lower or 'stage/' in context_lower: + source_label = self._find_label(label_lookup, 'source', 'staging') + elif 'production' in context_lower or 'prod' in context_lower: + source_label = self._find_label(label_lookup, 'source', 'production') + if source_label: + suggested.append(source_label) + + # Risk detection + risk_label = None + if any(word in context_lower for word in ['breaking', 'breaking change', 'major', 'risky']): + risk_label = self._find_label(label_lookup, 'risk', 'high') + elif any(word in context_lower for word in ['safe', 'low risk', 'minor']): + risk_label = self._find_label(label_lookup, 'risk', 'low') + if risk_label: + suggested.append(risk_label) + + logger.info(f"Suggested {len(suggested)} labels based on context and {len(label_names)} available labels") + return suggested + + def _build_label_lookup(self, label_names: List[str]) -> Dict[str, Dict[str, str]]: + """ + Build a lookup dictionary for label matching. + + Supports various label formats: + - Slash format: Type/Bug, Priority/High + - Colon-space format: Type: Bug, Priority: High + - Colon format: Type:Bug + + Args: + label_names: List of actual label names from repository + + Returns: + Nested dict: {category: {value: actual_label_name}} + """ + lookup: Dict[str, Dict[str, str]] = {} + + for label in label_names: + # Try different separator patterns + # Pattern: CategoryValue + # Separators: /, : , : + match = re.match(r'^([^/:]+)(?:/|:\s*|:)(.+)$', label) + if match: + category = match.group(1).lower().rstrip('s') # Normalize: "Efforts" -> "effort" + value = match.group(2).lower() + + if category not in lookup: + lookup[category] = {} + lookup[category][value] = label + + return lookup + + def _find_label(self, lookup: Dict[str, Dict[str, str]], category: str, value: str) -> Optional[str]: + """ + Find actual label name from lookup. + + Args: + lookup: Label lookup dictionary + category: Category to search (e.g., 'type', 'priority') + value: Value to find (e.g., 'bug', 'high') + + Returns: + Actual label name if found, None otherwise + """ + category_lower = category.lower().rstrip('s') # Normalize + value_lower = value.lower() + + if category_lower in lookup and value_lower in lookup[category_lower]: + return lookup[category_lower][value_lower] + + return None + + # Organization-level label categories (workflow labels shared across repos) + ORG_LABEL_CATEGORIES = {'agent', 'complexity', 'effort', 'efforts', 'priority', 'risk', 'source', 'type'} + + # Repository-level label categories (project-specific labels) + REPO_LABEL_CATEGORIES = {'component', 'tech'} + + async def create_label_smart( + self, + name: str, + color: str, + description: Optional[str] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create a label at the appropriate level (org or repo) based on category. + Skips if label already exists (checks both org and repo levels). + + Organization labels: Agent, Complexity, Effort, Priority, Risk, Source, Type + Repository labels: Component, Tech + + Args: + name: Label name (e.g., 'Type/Bug', 'Component/Backend') + color: Hex color code + description: Optional label description + repo: Repository in 'owner/repo' format + + Returns: + Created label dictionary with 'level' key, or 'skipped' if already exists + """ + loop = asyncio.get_event_loop() + + target_repo = repo or self.gitea.repo + if not target_repo or '/' not in target_repo: + raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')") + + owner = target_repo.split('/')[0] + is_org = await loop.run_in_executor( + None, + lambda: self.gitea.is_org_repo(target_repo) + ) + + # Fetch existing labels to check for duplicates + existing_labels = await self.get_labels(target_repo) + all_existing = existing_labels.get('organization', []) + existing_labels.get('repository', []) + existing_names = [label['name'].lower() for label in all_existing] + + # Normalize the new label name for comparison + name_normalized = name.lower() + + # Also check for format variations (Type/Bug vs Type: Bug) + name_variations = [name_normalized] + if '/' in name: + name_variations.append(name.replace('/', ': ').lower()) + name_variations.append(name.replace('/', ':').lower()) + elif ': ' in name: + name_variations.append(name.replace(': ', '/').lower()) + elif ':' in name: + name_variations.append(name.replace(':', '/').lower()) + + # Check if label already exists in any format + for variation in name_variations: + if variation in existing_names: + logger.info(f"Label '{name}' already exists (found as '{variation}'), skipping") + return { + 'name': name, + 'skipped': True, + 'reason': f"Label already exists", + 'level': 'existing' + } + + # Parse category from label name + category = None + if '/' in name: + category = name.split('/')[0].lower().rstrip('s') + elif ':' in name: + category = name.split(':')[0].strip().lower().rstrip('s') + + # If it's an org repo and the category is an org-level category, create at org level + if is_org and category in self.ORG_LABEL_CATEGORIES: + result = await loop.run_in_executor( + None, + lambda: self.gitea.create_org_label(owner, name, color, description) + ) + # Handle unexpected response types (API may return list or non-dict) + if not isinstance(result, dict): + logger.error(f"Unexpected API response type for org label: {type(result)} - {result}") + return { + 'name': name, + 'error': True, + 'reason': f"API returned {type(result).__name__} instead of dict: {result}", + 'level': 'organization' + } + result['level'] = 'organization' + result['skipped'] = False + logger.info(f"Created organization label '{name}' in {owner}") + else: + # Create at repo level + result = await loop.run_in_executor( + None, + lambda: self.gitea.create_label(name, color, description, target_repo) + ) + # Handle unexpected response types (API may return list or non-dict) + if not isinstance(result, dict): + logger.error(f"Unexpected API response type for repo label: {type(result)} - {result}") + return { + 'name': name, + 'error': True, + 'reason': f"API returned {type(result).__name__} instead of dict: {result}", + 'level': 'repository' + } + result['level'] = 'repository' + result['skipped'] = False + logger.info(f"Created repository label '{name}' in {target_repo}") + + return result diff --git a/gitea_mcp/tools/milestones.py b/gitea_mcp/tools/milestones.py new file mode 100644 index 0000000..ddc15a0 --- /dev/null +++ b/gitea_mcp/tools/milestones.py @@ -0,0 +1,145 @@ +""" +Milestone management tools for MCP server. + +Provides async wrappers for milestone operations: +- CRUD operations for milestones +- Milestone-sprint relationship tracking +""" +import asyncio +import logging +from typing import List, Dict, Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class MilestoneTools: + """Async wrappers for Gitea milestone operations""" + + def __init__(self, gitea_client): + """ + Initialize milestone tools. + + Args: + gitea_client: GiteaClient instance + """ + self.gitea = gitea_client + + async def list_milestones( + self, + state: str = 'open', + repo: Optional[str] = None + ) -> List[Dict]: + """ + List all milestones in repository. + + Args: + state: Milestone state (open, closed, all) + repo: Repository in owner/repo format + + Returns: + List of milestone dictionaries + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.list_milestones(state, repo) + ) + + async def get_milestone( + self, + milestone_id: int, + repo: Optional[str] = None + ) -> Dict: + """ + Get a specific milestone by ID. + + Args: + milestone_id: Milestone ID + repo: Repository in owner/repo format + + Returns: + Milestone dictionary + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.get_milestone(milestone_id, repo) + ) + + async def create_milestone( + self, + title: str, + description: Optional[str] = None, + due_on: Optional[str] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create a new milestone. + + Args: + title: Milestone title (e.g., "v2.0 Release", "Sprint 17") + description: Milestone description + due_on: Due date in ISO 8601 format (e.g., "2025-02-01T00:00:00Z") + repo: Repository in owner/repo format + + Returns: + Created milestone dictionary + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_milestone(title, description, due_on, repo) + ) + + async def update_milestone( + self, + milestone_id: int, + title: Optional[str] = None, + description: Optional[str] = None, + state: Optional[str] = None, + due_on: Optional[str] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Update an existing milestone. + + Args: + milestone_id: Milestone ID + title: New title (optional) + description: New description (optional) + state: New state - 'open' or 'closed' (optional) + due_on: New due date (optional) + repo: Repository in owner/repo format + + Returns: + Updated milestone dictionary + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.update_milestone( + milestone_id, title, description, state, due_on, repo + ) + ) + + async def delete_milestone( + self, + milestone_id: int, + repo: Optional[str] = None + ) -> bool: + """ + Delete a milestone. + + Args: + milestone_id: Milestone ID + repo: Repository in owner/repo format + + Returns: + True if deleted successfully + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.delete_milestone(milestone_id, repo) + ) diff --git a/gitea_mcp/tools/pull_requests.py b/gitea_mcp/tools/pull_requests.py new file mode 100644 index 0000000..89734fc --- /dev/null +++ b/gitea_mcp/tools/pull_requests.py @@ -0,0 +1,335 @@ +""" +Pull request management tools for MCP server. + +Provides async wrappers for PR operations with: +- Branch-aware security +- PMO multi-repo support +- Comprehensive error handling +""" +import asyncio +import os +import subprocess +import logging +from typing import List, Dict, Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class PullRequestTools: + """Async wrappers for Gitea pull request operations with branch detection""" + + def __init__(self, gitea_client): + """ + Initialize pull request tools. + + Args: + gitea_client: GiteaClient instance + """ + self.gitea = gitea_client + + def _get_project_directory(self) -> Optional[str]: + """ + Get the user's project directory from environment. + + Returns: + Project directory path or None if not set + """ + return os.environ.get('CLAUDE_PROJECT_DIR') + + def _get_current_branch(self) -> str: + """ + Get current git branch from user's project directory. + + Uses CLAUDE_PROJECT_DIR environment variable to determine the correct + directory for git operations, avoiding the bug where git runs from + the installed plugin directory instead of the user's project. + + Returns: + Current branch name or 'unknown' if not in a git repo + """ + try: + project_dir = self._get_project_directory() + result = subprocess.run( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + capture_output=True, + text=True, + check=True, + cwd=project_dir # Run git in project directory, not plugin directory + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + return "unknown" + + def _check_branch_permissions(self, operation: str) -> bool: + """ + Check if operation is allowed on current branch. + + Args: + operation: Operation name (list_prs, create_review, etc.) + + Returns: + True if operation is allowed, False otherwise + """ + branch = self._get_current_branch() + + # Read-only operations allowed everywhere + read_ops = ['list_pull_requests', 'get_pull_request', 'get_pr_diff', 'get_pr_comments'] + + # Production branches (read-only) + if branch in ['main', 'master'] or branch.startswith('prod/'): + return operation in read_ops + + # Staging branches (read-only for PRs, can comment) + if branch == 'staging' or branch.startswith('stage/'): + return operation in read_ops + ['add_pr_comment'] + + # Development branches (full access) + # Include all common feature/fix branch patterns + dev_prefixes = ( + 'feat/', 'feature/', 'dev/', + 'fix/', 'bugfix/', 'hotfix/', + 'chore/', 'refactor/', 'docs/', 'test/' + ) + if branch in ['development', 'develop'] or branch.startswith(dev_prefixes): + return True + + # Unknown branch - be restrictive + return operation in read_ops + + async def list_pull_requests( + self, + state: str = 'open', + sort: str = 'recentupdate', + labels: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> List[Dict]: + """ + List pull requests from repository (async wrapper). + + Args: + state: PR state (open, closed, all) + sort: Sort order + labels: Filter by labels + repo: Override configured repo (for PMO multi-repo) + + Returns: + List of pull request dictionaries + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('list_pull_requests'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot list PRs on branch '{branch}'. " + f"Switch to a development branch." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.list_pull_requests(state, sort, labels, repo) + ) + + async def get_pull_request( + self, + pr_number: int, + repo: Optional[str] = None + ) -> Dict: + """ + Get specific pull request details (async wrapper). + + Args: + pr_number: Pull request number + repo: Override configured repo (for PMO multi-repo) + + Returns: + Pull request dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('get_pull_request'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot get PR on branch '{branch}'. " + f"Switch to a development branch." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.get_pull_request(pr_number, repo) + ) + + async def get_pr_diff( + self, + pr_number: int, + repo: Optional[str] = None + ) -> str: + """ + Get pull request diff (async wrapper). + + Args: + pr_number: Pull request number + repo: Override configured repo (for PMO multi-repo) + + Returns: + Diff as string + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('get_pr_diff'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot get PR diff on branch '{branch}'. " + f"Switch to a development branch." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.get_pr_diff(pr_number, repo) + ) + + async def get_pr_comments( + self, + pr_number: int, + repo: Optional[str] = None + ) -> List[Dict]: + """ + Get comments on a pull request (async wrapper). + + Args: + pr_number: Pull request number + repo: Override configured repo (for PMO multi-repo) + + Returns: + List of comment dictionaries + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('get_pr_comments'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot get PR comments on branch '{branch}'. " + f"Switch to a development branch." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.get_pr_comments(pr_number, repo) + ) + + async def create_pr_review( + self, + pr_number: int, + body: str, + event: str = 'COMMENT', + comments: Optional[List[Dict]] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create a review on a pull request (async wrapper with branch check). + + Args: + pr_number: Pull request number + body: Review body/summary + event: Review action (APPROVE, REQUEST_CHANGES, COMMENT) + comments: Optional list of inline comments + repo: Override configured repo (for PMO multi-repo) + + Returns: + Created review dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('create_pr_review'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot create PR review on branch '{branch}'. " + f"Switch to a development branch to review PRs." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_pr_review(pr_number, body, event, comments, repo) + ) + + async def add_pr_comment( + self, + pr_number: int, + body: str, + repo: Optional[str] = None + ) -> Dict: + """ + Add a general comment to a pull request (async wrapper with branch check). + + Args: + pr_number: Pull request number + body: Comment text + repo: Override configured repo (for PMO multi-repo) + + Returns: + Created comment dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('add_pr_comment'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot add PR comment on branch '{branch}'. " + f"Switch to a development or staging branch to comment on PRs." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.add_pr_comment(pr_number, body, repo) + ) + + async def create_pull_request( + self, + title: str, + body: str, + head: str, + base: str, + labels: Optional[List[str]] = None, + repo: Optional[str] = None + ) -> Dict: + """ + Create a new pull request (async wrapper with branch check). + + Args: + title: PR title + body: PR description/body + head: Source branch name (the branch with changes) + base: Target branch name (the branch to merge into) + labels: Optional list of label names + repo: Override configured repo (for PMO multi-repo) + + Returns: + Created pull request dictionary + + Raises: + PermissionError: If operation not allowed on current branch + """ + if not self._check_branch_permissions('create_pull_request'): + branch = self._get_current_branch() + raise PermissionError( + f"Cannot create PR on branch '{branch}'. " + f"Switch to a development or feature branch to create PRs." + ) + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_pull_request(title, body, head, base, labels, repo) + ) diff --git a/gitea_mcp/tools/wiki.py b/gitea_mcp/tools/wiki.py new file mode 100644 index 0000000..36f389e --- /dev/null +++ b/gitea_mcp/tools/wiki.py @@ -0,0 +1,187 @@ +""" +Wiki management tools for MCP server. + +Provides async wrappers for wiki operations to support lessons learned: +- Page CRUD operations +- Lessons learned creation and search +- RFC number allocation +""" +import asyncio +import logging +import re +from typing import List, Dict, Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class WikiTools: + """Async wrappers for Gitea wiki operations""" + + def __init__(self, gitea_client): + """ + Initialize wiki tools. + + Args: + gitea_client: GiteaClient instance + """ + self.gitea = gitea_client + + async def list_wiki_pages(self, repo: Optional[str] = None) -> List[Dict]: + """List all wiki pages in repository.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.list_wiki_pages(repo) + ) + + async def get_wiki_page( + self, + page_name: str, + repo: Optional[str] = None + ) -> Dict: + """Get a specific wiki page by name.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.get_wiki_page(page_name, repo) + ) + + async def create_wiki_page( + self, + title: str, + content: str, + repo: Optional[str] = None + ) -> Dict: + """Create a new wiki page.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_wiki_page(title, content, repo) + ) + + async def update_wiki_page( + self, + page_name: str, + content: str, + repo: Optional[str] = None + ) -> Dict: + """Update an existing wiki page.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.update_wiki_page(page_name, content, repo) + ) + + async def delete_wiki_page( + self, + page_name: str, + repo: Optional[str] = None + ) -> bool: + """Delete a wiki page.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.delete_wiki_page(page_name, repo) + ) + + async def search_wiki_pages( + self, + query: str, + repo: Optional[str] = None + ) -> List[Dict]: + """Search wiki pages by title.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.search_wiki_pages(query, repo) + ) + + async def create_lesson( + self, + title: str, + content: str, + tags: List[str], + category: str = "sprints", + repo: Optional[str] = None + ) -> Dict: + """ + Create a lessons learned entry in the wiki. + + Args: + title: Lesson title (e.g., "Sprint 16 - Prevent Infinite Loops") + content: Lesson content in markdown + tags: List of tags for categorization + category: Category (sprints, patterns, architecture, etc.) + repo: Repository in owner/repo format + + Returns: + Created wiki page + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.gitea.create_lesson(title, content, tags, category, repo) + ) + + async def search_lessons( + self, + query: Optional[str] = None, + tags: Optional[List[str]] = None, + limit: int = 20, + repo: Optional[str] = None + ) -> List[Dict]: + """ + Search lessons learned from previous sprints. + + Args: + query: Search query (optional) + tags: Tags to filter by (optional) + limit: Maximum results (default 20) + repo: Repository in owner/repo format + + Returns: + List of matching lessons + """ + loop = asyncio.get_event_loop() + results = await loop.run_in_executor( + None, + lambda: self.gitea.search_lessons(query, tags, repo) + ) + return results[:limit] + + async def allocate_rfc_number(self, repo: Optional[str] = None) -> Dict: + """ + Allocate the next available RFC number. + + Scans existing wiki pages for RFC-NNNN pattern and returns + the next sequential number. + + Args: + repo: Repository in owner/repo format + + Returns: + Dict with 'next_number' (int) and 'formatted' (str like 'RFC-0001') + """ + pages = await self.list_wiki_pages(repo) + + # Extract RFC numbers from page titles + rfc_numbers = [] + rfc_pattern = re.compile(r'^RFC-(\d{4})') + + for page in pages: + title = page.get('title', '') + match = rfc_pattern.match(title) + if match: + rfc_numbers.append(int(match.group(1))) + + # Calculate next number + if rfc_numbers: + next_num = max(rfc_numbers) + 1 + else: + next_num = 1 + + return { + 'next_number': next_num, + 'formatted': f'RFC-{next_num:04d}' + } diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..0902751 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "gitea-mcp" +version = "1.0.0" +description = "MCP server for Gitea — issues, PRs, wikis, milestones, merges" +readme = "README.md" +requires-python = ">=3.10" +license = {text = "MIT"} +authors = [ + { name = "Leo Miranda" } +] +keywords = ["mcp", "gitea", "claude", "tools", "model-context-protocol"] + +dependencies = [ + "mcp>=0.9.0", + "python-dotenv>=1.0.0", + "requests>=2.31.0", + "pydantic>=2.5.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4.3", + "pytest-asyncio>=0.23.0", + "pytest-cov>=4.0.0", +] + +[project.scripts] +gitea-mcp = "gitea_mcp.server:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["gitea_mcp*"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c61ab9a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +mcp>=0.9.0 +python-dotenv>=1.0.0 +requests>=2.31.0 +pydantic>=2.5.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..d9c1be5 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,259 @@ +""" +Unit tests for configuration loader. +""" +import pytest +from pathlib import Path +import os +from gitea_mcp.config import GiteaConfig + + +def test_load_system_config(tmp_path, monkeypatch): + """Test loading system-level configuration""" + # Mock home directory + config_dir = tmp_path / '.config' / 'claude' + config_dir.mkdir(parents=True) + + config_file = config_dir / 'gitea.env' + config_file.write_text( + "GITEA_API_URL=https://test.com/api/v1\n" + "GITEA_API_TOKEN=test_token\n" + "GITEA_OWNER=test_owner\n" + ) + + monkeypatch.setenv('HOME', str(tmp_path)) + monkeypatch.chdir(tmp_path) + + config = GiteaConfig() + result = config.load() + + assert result['api_url'] == 'https://test.com/api/v1' + assert result['api_token'] == 'test_token' + assert result['mode'] == 'company' # No repo specified + assert result['repo'] is None + + +def test_project_config_override(tmp_path, monkeypatch): + """Test that project config overrides system config""" + # Set up system config + system_config_dir = tmp_path / '.config' / 'claude' + system_config_dir.mkdir(parents=True) + + system_config = system_config_dir / 'gitea.env' + system_config.write_text( + "GITEA_API_URL=https://test.com/api/v1\n" + "GITEA_API_TOKEN=test_token\n" + "GITEA_OWNER=test_owner\n" + ) + + # Set up project config + project_dir = tmp_path / 'project' + project_dir.mkdir() + + project_config = project_dir / '.env' + project_config.write_text("GITEA_REPO=test_repo\n") + + monkeypatch.setenv('HOME', str(tmp_path)) + monkeypatch.chdir(project_dir) + + config = GiteaConfig() + result = config.load() + + assert result['repo'] == 'test_repo' + assert result['mode'] == 'project' + + +def test_missing_system_config(tmp_path, monkeypatch): + """Test error handling for missing system configuration""" + monkeypatch.setenv('HOME', str(tmp_path)) + monkeypatch.chdir(tmp_path) + + with pytest.raises(FileNotFoundError) as exc_info: + config = GiteaConfig() + config.load() + + assert "System config not found" in str(exc_info.value) + + +def test_missing_required_config(tmp_path, monkeypatch): + """Test error handling for missing required variables""" + # Clear environment variables + for var in ['GITEA_API_URL', 'GITEA_API_TOKEN', 'GITEA_OWNER', 'GITEA_REPO']: + monkeypatch.delenv(var, raising=False) + + # Create incomplete config + config_dir = tmp_path / '.config' / 'claude' + config_dir.mkdir(parents=True) + + config_file = config_dir / 'gitea.env' + config_file.write_text( + "GITEA_API_URL=https://test.com/api/v1\n" + # Missing GITEA_API_TOKEN and GITEA_OWNER + ) + + monkeypatch.setenv('HOME', str(tmp_path)) + monkeypatch.chdir(tmp_path) + + with pytest.raises(ValueError) as exc_info: + config = GiteaConfig() + config.load() + + assert "Missing required configuration" in str(exc_info.value) + + +def test_mode_detection_project(tmp_path, monkeypatch): + """Test mode detection for project mode""" + config_dir = tmp_path / '.config' / 'claude' + config_dir.mkdir(parents=True) + + config_file = config_dir / 'gitea.env' + config_file.write_text( + "GITEA_API_URL=https://test.com/api/v1\n" + "GITEA_API_TOKEN=test_token\n" + "GITEA_OWNER=test_owner\n" + "GITEA_REPO=test_repo\n" + ) + + monkeypatch.setenv('HOME', str(tmp_path)) + monkeypatch.chdir(tmp_path) + + config = GiteaConfig() + result = config.load() + + assert result['mode'] == 'project' + assert result['repo'] == 'test_repo' + + +def test_mode_detection_company(tmp_path, monkeypatch): + """Test mode detection for company mode (PMO)""" + # Clear environment variables, especially GITEA_REPO + for var in ['GITEA_API_URL', 'GITEA_API_TOKEN', 'GITEA_OWNER', 'GITEA_REPO']: + monkeypatch.delenv(var, raising=False) + + config_dir = tmp_path / '.config' / 'claude' + config_dir.mkdir(parents=True) + + config_file = config_dir / 'gitea.env' + config_file.write_text( + "GITEA_API_URL=https://test.com/api/v1\n" + "GITEA_API_TOKEN=test_token\n" + "GITEA_OWNER=test_owner\n" + # No GITEA_REPO + ) + + monkeypatch.setenv('HOME', str(tmp_path)) + monkeypatch.chdir(tmp_path) + + config = GiteaConfig() + result = config.load() + + assert result['mode'] == 'company' + assert result['repo'] is None + + +# ======================================== +# GIT URL PARSING TESTS +# ======================================== + +def test_parse_git_url_ssh_format(): + """Test parsing SSH format git URL""" + config = GiteaConfig() + + # SSH with port: ssh://git@host:port/owner/repo.git + url = "ssh://git@hotserv.tailc9b278.ts.net:2222/personal-projects/personal-portfolio.git" + result = config._parse_git_url(url) + assert result == "personal-projects/personal-portfolio" + + +def test_parse_git_url_ssh_short_format(): + """Test parsing SSH short format git URL""" + config = GiteaConfig() + + # SSH short: git@host:owner/repo.git + url = "git@github.com:owner/repo.git" + result = config._parse_git_url(url) + assert result == "owner/repo" + + +def test_parse_git_url_https_format(): + """Test parsing HTTPS format git URL""" + config = GiteaConfig() + + # HTTPS: https://host/owner/repo.git + url = "https://gitea.hotserv.cloud/personal-projects/leo-claude-mktplace.git" + result = config._parse_git_url(url) + assert result == "personal-projects/leo-claude-mktplace" + + +def test_parse_git_url_http_format(): + """Test parsing HTTP format git URL""" + config = GiteaConfig() + + # HTTP: http://host/owner/repo.git + url = "http://gitea.hotserv.cloud/personal-projects/repo.git" + result = config._parse_git_url(url) + assert result == "personal-projects/repo" + + +def test_parse_git_url_without_git_suffix(): + """Test parsing git URL without .git suffix""" + config = GiteaConfig() + + url = "https://github.com/owner/repo" + result = config._parse_git_url(url) + assert result == "owner/repo" + + +def test_parse_git_url_invalid_format(): + """Test parsing invalid git URL returns None""" + config = GiteaConfig() + + url = "not-a-valid-url" + result = config._parse_git_url(url) + assert result is None + + +def test_find_project_directory_from_env(tmp_path, monkeypatch): + """Test finding project directory from CLAUDE_PROJECT_DIR env var""" + project_dir = tmp_path / 'my-project' + project_dir.mkdir() + (project_dir / '.git').mkdir() + + monkeypatch.setenv('CLAUDE_PROJECT_DIR', str(project_dir)) + + config = GiteaConfig() + result = config._find_project_directory() + + assert result == project_dir + + +def test_find_project_directory_from_cwd(tmp_path, monkeypatch): + """Test finding project directory from cwd with .env file""" + project_dir = tmp_path / 'project' + project_dir.mkdir() + (project_dir / '.env').write_text("GITEA_REPO=test/repo") + + monkeypatch.chdir(project_dir) + # Clear env vars that might interfere + monkeypatch.delenv('CLAUDE_PROJECT_DIR', raising=False) + monkeypatch.delenv('PWD', raising=False) + + config = GiteaConfig() + result = config._find_project_directory() + + assert result == project_dir + + +def test_find_project_directory_none_when_no_markers(tmp_path, monkeypatch): + """Test returns None when no project markers found""" + empty_dir = tmp_path / 'empty' + empty_dir.mkdir() + + monkeypatch.chdir(empty_dir) + monkeypatch.delenv('CLAUDE_PROJECT_DIR', raising=False) + monkeypatch.delenv('PWD', raising=False) + monkeypatch.delenv('GITEA_REPO', raising=False) + + config = GiteaConfig() + result = config._find_project_directory() + + assert result is None diff --git a/tests/test_gitea_client.py b/tests/test_gitea_client.py new file mode 100644 index 0000000..43c5a93 --- /dev/null +++ b/tests/test_gitea_client.py @@ -0,0 +1,270 @@ +""" +Unit tests for Gitea API client. +""" +import pytest +from unittest.mock import Mock, patch, MagicMock +from gitea_mcp.gitea_client import GiteaClient + + +@pytest.fixture +def mock_config(): + """Fixture providing mocked configuration""" + with patch('gitea_mcp.gitea_client.GiteaConfig') as mock_cfg: + mock_instance = mock_cfg.return_value + mock_instance.load.return_value = { + 'api_url': 'https://test.com/api/v1', + 'api_token': 'test_token', + 'repo': 'test_owner/test_repo', # Combined owner/repo format + 'mode': 'project' + } + yield mock_cfg + + +@pytest.fixture +def gitea_client(mock_config): + """Fixture providing GiteaClient instance with mocked config""" + return GiteaClient() + + +def test_client_initialization(gitea_client): + """Test client initializes with correct configuration""" + assert gitea_client.base_url == 'https://test.com/api/v1' + assert gitea_client.token == 'test_token' + assert gitea_client.repo == 'test_owner/test_repo' # Combined format + assert gitea_client.mode == 'project' + assert 'Authorization' in gitea_client.session.headers + assert gitea_client.session.headers['Authorization'] == 'token test_token' + + +def test_list_issues(gitea_client): + """Test listing issues""" + mock_response = Mock() + mock_response.json.return_value = [ + {'number': 1, 'title': 'Test Issue 1'}, + {'number': 2, 'title': 'Test Issue 2'} + ] + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + issues = gitea_client.list_issues(state='open') + + assert len(issues) == 2 + assert issues[0]['title'] == 'Test Issue 1' + gitea_client.session.get.assert_called_once() + + +def test_list_issues_with_labels(gitea_client): + """Test listing issues with label filter""" + mock_response = Mock() + mock_response.json.return_value = [{'number': 1, 'title': 'Bug Issue'}] + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + issues = gitea_client.list_issues(state='open', labels=['Type/Bug']) + + gitea_client.session.get.assert_called_once() + call_args = gitea_client.session.get.call_args + assert call_args[1]['params']['labels'] == 'Type/Bug' + + +def test_get_issue(gitea_client): + """Test getting specific issue""" + mock_response = Mock() + mock_response.json.return_value = {'number': 1, 'title': 'Test Issue'} + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + issue = gitea_client.get_issue(1) + + assert issue['number'] == 1 + assert issue['title'] == 'Test Issue' + + +def test_create_issue(gitea_client): + """Test creating new issue""" + mock_response = Mock() + mock_response.json.return_value = { + 'number': 1, + 'title': 'New Issue', + 'body': 'Issue body' + } + mock_response.raise_for_status = Mock() + + # Mock is_org_repo to avoid network call during label resolution + with patch.object(gitea_client, 'is_org_repo', return_value=True): + # Mock get_org_labels and get_labels for label resolution + with patch.object(gitea_client, 'get_org_labels', return_value=[{'name': 'Type/Bug', 'id': 1}]): + with patch.object(gitea_client, 'get_labels', return_value=[]): + with patch.object(gitea_client.session, 'post', return_value=mock_response): + issue = gitea_client.create_issue( + title='New Issue', + body='Issue body', + labels=['Type/Bug'] + ) + + assert issue['title'] == 'New Issue' + gitea_client.session.post.assert_called_once() + + +def test_update_issue(gitea_client): + """Test updating existing issue""" + mock_response = Mock() + mock_response.json.return_value = { + 'number': 1, + 'title': 'Updated Issue' + } + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'patch', return_value=mock_response): + issue = gitea_client.update_issue( + issue_number=1, + title='Updated Issue' + ) + + assert issue['title'] == 'Updated Issue' + gitea_client.session.patch.assert_called_once() + + +def test_add_comment(gitea_client): + """Test adding comment to issue""" + mock_response = Mock() + mock_response.json.return_value = {'body': 'Test comment'} + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'post', return_value=mock_response): + comment = gitea_client.add_comment(1, 'Test comment') + + assert comment['body'] == 'Test comment' + gitea_client.session.post.assert_called_once() + + +def test_get_labels(gitea_client): + """Test getting repository labels""" + mock_response = Mock() + mock_response.json.return_value = [ + {'name': 'Type/Bug'}, + {'name': 'Priority/High'} + ] + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + labels = gitea_client.get_labels() + + assert len(labels) == 2 + assert labels[0]['name'] == 'Type/Bug' + + +def test_get_org_labels(gitea_client): + """Test getting organization labels""" + mock_response = Mock() + mock_response.json.return_value = [ + {'name': 'Type/Bug'}, + {'name': 'Type/Feature'} + ] + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + labels = gitea_client.get_org_labels(org='test_owner') + + assert len(labels) == 2 + + +def test_list_repos(gitea_client): + """Test listing organization repositories (PMO mode)""" + mock_response = Mock() + mock_response.json.return_value = [ + {'name': 'repo1'}, + {'name': 'repo2'} + ] + mock_response.raise_for_status = Mock() + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + repos = gitea_client.list_repos(org='test_owner') + + assert len(repos) == 2 + assert repos[0]['name'] == 'repo1' + + +def test_aggregate_issues(gitea_client): + """Test aggregating issues across repositories (PMO mode)""" + # Mock list_repos + gitea_client.list_repos = Mock(return_value=[ + {'name': 'repo1'}, + {'name': 'repo2'} + ]) + + # Mock list_issues + gitea_client.list_issues = Mock(side_effect=[ + [{'number': 1, 'title': 'Issue 1'}], # repo1 + [{'number': 2, 'title': 'Issue 2'}] # repo2 + ]) + + aggregated = gitea_client.aggregate_issues(org='test_owner', state='open') + + assert 'repo1' in aggregated + assert 'repo2' in aggregated + assert len(aggregated['repo1']) == 1 + assert len(aggregated['repo2']) == 1 + + +def test_no_repo_specified_error(gitea_client): + """Test error when repository not specified or invalid format""" + # Create client without repo + with patch('gitea_mcp.gitea_client.GiteaConfig') as mock_cfg: + mock_instance = mock_cfg.return_value + mock_instance.load.return_value = { + 'api_url': 'https://test.com/api/v1', + 'api_token': 'test_token', + 'repo': None, # No repo + 'mode': 'company' + } + client = GiteaClient() + + with pytest.raises(ValueError) as exc_info: + client.list_issues() + + assert "Use 'owner/repo' format" in str(exc_info.value) + + +# ======================================== +# ORGANIZATION DETECTION TESTS +# ======================================== + +def test_is_organization_true(gitea_client): + """Test _is_organization returns True for valid organization""" + mock_response = Mock() + mock_response.status_code = 200 + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + result = gitea_client._is_organization('personal-projects') + + assert result is True + gitea_client.session.get.assert_called_once_with( + 'https://test.com/api/v1/orgs/personal-projects' + ) + + +def test_is_organization_false(gitea_client): + """Test _is_organization returns False for user account""" + mock_response = Mock() + mock_response.status_code = 404 + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + result = gitea_client._is_organization('lmiranda') + + assert result is False + + +def test_is_org_repo_uses_orgs_endpoint(gitea_client): + """Test is_org_repo uses /orgs endpoint instead of owner.type""" + mock_response = Mock() + mock_response.status_code = 200 + + with patch.object(gitea_client.session, 'get', return_value=mock_response): + result = gitea_client.is_org_repo('personal-projects/repo') + + assert result is True + # Should call /orgs/personal-projects, not /repos/.../ + gitea_client.session.get.assert_called_once_with( + 'https://test.com/api/v1/orgs/personal-projects' + ) diff --git a/tests/test_issues.py b/tests/test_issues.py new file mode 100644 index 0000000..3a7e9b7 --- /dev/null +++ b/tests/test_issues.py @@ -0,0 +1,163 @@ +""" +Unit tests for issue tools with branch detection. +""" +import pytest +from unittest.mock import Mock, patch, AsyncMock +from gitea_mcp.tools.issues import IssueTools + + +@pytest.fixture +def mock_gitea_client(): + """Fixture providing mocked Gitea client""" + client = Mock() + client.mode = 'project' + return client + + +@pytest.fixture +def issue_tools(mock_gitea_client): + """Fixture providing IssueTools instance""" + return IssueTools(mock_gitea_client) + + +@pytest.mark.asyncio +async def test_list_issues_development_branch(issue_tools): + """Test listing issues on development branch (allowed)""" + with patch.object(issue_tools, '_get_current_branch', return_value='feat/test-feature'): + issue_tools.gitea.list_issues = Mock(return_value=[{'number': 1}]) + + issues = await issue_tools.list_issues(state='open') + + assert len(issues) == 1 + issue_tools.gitea.list_issues.assert_called_once() + + +@pytest.mark.asyncio +async def test_create_issue_development_branch(issue_tools): + """Test creating issue on development branch (allowed)""" + with patch.object(issue_tools, '_get_current_branch', return_value='development'): + issue_tools.gitea.create_issue = Mock(return_value={'number': 1}) + + issue = await issue_tools.create_issue('Test', 'Body') + + assert issue['number'] == 1 + issue_tools.gitea.create_issue.assert_called_once() + + +@pytest.mark.asyncio +async def test_create_issue_main_branch_blocked(issue_tools): + """Test creating issue on main branch (blocked)""" + with patch.object(issue_tools, '_get_current_branch', return_value='main'): + with pytest.raises(PermissionError) as exc_info: + await issue_tools.create_issue('Test', 'Body') + + assert "Cannot create issues on branch 'main'" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_create_issue_staging_branch_allowed(issue_tools): + """Test creating issue on staging branch (allowed for documentation)""" + with patch.object(issue_tools, '_get_current_branch', return_value='staging'): + issue_tools.gitea.create_issue = Mock(return_value={'number': 1}) + + issue = await issue_tools.create_issue('Test', 'Body') + + assert issue['number'] == 1 + + +@pytest.mark.asyncio +async def test_update_issue_main_branch_blocked(issue_tools): + """Test updating issue on main branch (blocked)""" + with patch.object(issue_tools, '_get_current_branch', return_value='main'): + with pytest.raises(PermissionError) as exc_info: + await issue_tools.update_issue(1, title='Updated') + + assert "Cannot update issues on branch 'main'" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_list_issues_main_branch_allowed(issue_tools): + """Test listing issues on main branch (allowed - read-only)""" + with patch.object(issue_tools, '_get_current_branch', return_value='main'): + issue_tools.gitea.list_issues = Mock(return_value=[{'number': 1}]) + + issues = await issue_tools.list_issues(state='open') + + assert len(issues) == 1 + + +@pytest.mark.asyncio +async def test_get_issue(issue_tools): + """Test getting specific issue""" + with patch.object(issue_tools, '_get_current_branch', return_value='development'): + issue_tools.gitea.get_issue = Mock(return_value={'number': 1, 'title': 'Test'}) + + issue = await issue_tools.get_issue(1) + + assert issue['number'] == 1 + + +@pytest.mark.asyncio +async def test_add_comment(issue_tools): + """Test adding comment to issue""" + with patch.object(issue_tools, '_get_current_branch', return_value='development'): + issue_tools.gitea.add_comment = Mock(return_value={'body': 'Test comment'}) + + comment = await issue_tools.add_comment(1, 'Test comment') + + assert comment['body'] == 'Test comment' + + +@pytest.mark.asyncio +async def test_aggregate_issues_company_mode(issue_tools): + """Test aggregating issues in company mode""" + issue_tools.gitea.mode = 'company' + + with patch.object(issue_tools, '_get_current_branch', return_value='development'): + issue_tools.gitea.aggregate_issues = Mock(return_value={ + 'repo1': [{'number': 1}], + 'repo2': [{'number': 2}] + }) + + aggregated = await issue_tools.aggregate_issues(org='test_owner') + + assert 'repo1' in aggregated + assert 'repo2' in aggregated + + +@pytest.mark.asyncio +async def test_aggregate_issues_project_mode(issue_tools): + """Test that aggregate_issues works in project mode with org argument""" + issue_tools.gitea.mode = 'project' + + with patch.object(issue_tools, '_get_current_branch', return_value='development'): + issue_tools.gitea.aggregate_issues = Mock(return_value={ + 'repo1': [{'number': 1}] + }) + + # aggregate_issues now works in any mode when org is provided + aggregated = await issue_tools.aggregate_issues(org='test_owner') + + assert 'repo1' in aggregated + + +def test_branch_detection(): + """Test branch detection logic""" + tools = IssueTools(Mock()) + + # Test development branches + with patch.object(tools, '_get_current_branch', return_value='development'): + assert tools._check_branch_permissions('create_issue') is True + + with patch.object(tools, '_get_current_branch', return_value='feat/new-feature'): + assert tools._check_branch_permissions('create_issue') is True + + # Test production branches + with patch.object(tools, '_get_current_branch', return_value='main'): + assert tools._check_branch_permissions('create_issue') is False + assert tools._check_branch_permissions('list_issues') is True + + # Test staging branches + with patch.object(tools, '_get_current_branch', return_value='staging'): + assert tools._check_branch_permissions('create_issue') is True + assert tools._check_branch_permissions('update_issue') is False diff --git a/tests/test_labels.py b/tests/test_labels.py new file mode 100644 index 0000000..fefccbe --- /dev/null +++ b/tests/test_labels.py @@ -0,0 +1,478 @@ +""" +Unit tests for label tools with suggestion logic. +""" +import pytest +from unittest.mock import Mock, patch +from gitea_mcp.tools.labels import LabelTools + + +@pytest.fixture +def mock_gitea_client(): + """Fixture providing mocked Gitea client""" + client = Mock() + client.repo = 'test_org/test_repo' + client.is_org_repo = Mock(return_value=True) + return client + + +@pytest.fixture +def label_tools(mock_gitea_client): + """Fixture providing LabelTools instance""" + return LabelTools(mock_gitea_client) + + +@pytest.mark.asyncio +async def test_get_labels(label_tools): + """Test getting all labels (org + repo)""" + label_tools.gitea.get_org_labels = Mock(return_value=[ + {'name': 'Type/Bug'}, + {'name': 'Type/Feature'} + ]) + label_tools.gitea.get_labels = Mock(return_value=[ + {'name': 'Component/Backend'}, + {'name': 'Component/Frontend'} + ]) + + result = await label_tools.get_labels() + + assert len(result['organization']) == 2 + assert len(result['repository']) == 2 + assert result['total_count'] == 4 + + +# ======================================== +# LABEL LOOKUP TESTS (NEW) +# ======================================== + +def test_build_label_lookup_slash_format(): + """Test building label lookup with slash format labels""" + mock_client = Mock() + mock_client.repo = 'test/repo' + tools = LabelTools(mock_client) + + labels = ['Type/Bug', 'Type/Feature', 'Priority/High', 'Priority/Low'] + lookup = tools._build_label_lookup(labels) + + assert 'type' in lookup + assert 'bug' in lookup['type'] + assert lookup['type']['bug'] == 'Type/Bug' + assert lookup['type']['feature'] == 'Type/Feature' + assert 'priority' in lookup + assert lookup['priority']['high'] == 'Priority/High' + + +def test_build_label_lookup_colon_space_format(): + """Test building label lookup with colon-space format labels""" + mock_client = Mock() + mock_client.repo = 'test/repo' + tools = LabelTools(mock_client) + + labels = ['Type: Bug', 'Type: Feature', 'Priority: High', 'Effort: M'] + lookup = tools._build_label_lookup(labels) + + assert 'type' in lookup + assert 'bug' in lookup['type'] + assert lookup['type']['bug'] == 'Type: Bug' + assert lookup['type']['feature'] == 'Type: Feature' + assert 'priority' in lookup + assert lookup['priority']['high'] == 'Priority: High' + # Test singular "Effort" (not "Efforts") + assert 'effort' in lookup + assert lookup['effort']['m'] == 'Effort: M' + + +def test_build_label_lookup_efforts_normalization(): + """Test that 'Efforts' is normalized to 'effort' for matching""" + mock_client = Mock() + mock_client.repo = 'test/repo' + tools = LabelTools(mock_client) + + labels = ['Efforts/XS', 'Efforts/S', 'Efforts/M'] + lookup = tools._build_label_lookup(labels) + + # 'Efforts' should be normalized to 'effort' + assert 'effort' in lookup + assert lookup['effort']['xs'] == 'Efforts/XS' + + +def test_find_label(): + """Test finding labels from lookup""" + mock_client = Mock() + mock_client.repo = 'test/repo' + tools = LabelTools(mock_client) + + lookup = { + 'type': {'bug': 'Type: Bug', 'feature': 'Type: Feature'}, + 'priority': {'high': 'Priority: High', 'low': 'Priority: Low'} + } + + assert tools._find_label(lookup, 'type', 'bug') == 'Type: Bug' + assert tools._find_label(lookup, 'priority', 'high') == 'Priority: High' + assert tools._find_label(lookup, 'type', 'nonexistent') is None + assert tools._find_label(lookup, 'nonexistent', 'bug') is None + + +# ======================================== +# SUGGEST LABELS WITH DYNAMIC FORMAT TESTS +# ======================================== + +def _create_tools_with_labels(labels): + """Helper to create LabelTools with mocked labels""" + import asyncio + mock_client = Mock() + mock_client.repo = 'test/repo' + mock_client.is_org_repo = Mock(return_value=False) + mock_client.get_labels = Mock(return_value=[{'name': l} for l in labels]) + return LabelTools(mock_client) + + +@pytest.mark.asyncio +async def test_suggest_labels_with_slash_format(): + """Test label suggestion with slash format labels""" + labels = [ + 'Type/Bug', 'Type/Feature', 'Type/Refactor', + 'Priority/Critical', 'Priority/High', 'Priority/Medium', 'Priority/Low', + 'Complexity/Simple', 'Complexity/Medium', 'Complexity/Complex', + 'Component/Auth' + ] + tools = _create_tools_with_labels(labels) + + context = "Fix critical bug in login authentication" + suggestions = await tools.suggest_labels(context) + + assert 'Type/Bug' in suggestions + assert 'Priority/Critical' in suggestions + assert 'Component/Auth' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_with_colon_space_format(): + """Test label suggestion with colon-space format labels""" + labels = [ + 'Type: Bug', 'Type: Feature', 'Type: Refactor', + 'Priority: Critical', 'Priority: High', 'Priority: Medium', 'Priority: Low', + 'Complexity: Simple', 'Complexity: Medium', 'Complexity: Complex', + 'Effort: XS', 'Effort: S', 'Effort: M', 'Effort: L', 'Effort: XL' + ] + tools = _create_tools_with_labels(labels) + + context = "Fix critical bug for tiny 1 hour fix" + suggestions = await tools.suggest_labels(context) + + # Should return colon-space format labels + assert 'Type: Bug' in suggestions + assert 'Priority: Critical' in suggestions + assert 'Effort: XS' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_bug(): + """Test label suggestion for bug context""" + labels = [ + 'Type/Bug', 'Type/Feature', + 'Priority/Critical', 'Priority/High', 'Priority/Medium', 'Priority/Low', + 'Complexity/Simple', 'Complexity/Medium', 'Complexity/Complex', + 'Component/Auth' + ] + tools = _create_tools_with_labels(labels) + + context = "Fix critical bug in login authentication" + suggestions = await tools.suggest_labels(context) + + assert 'Type/Bug' in suggestions + assert 'Priority/Critical' in suggestions + assert 'Component/Auth' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_feature(): + """Test label suggestion for feature context""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium'] + tools = _create_tools_with_labels(labels) + + context = "Add new feature to implement user dashboard" + suggestions = await tools.suggest_labels(context) + + assert 'Type/Feature' in suggestions + assert any('Priority' in label for label in suggestions) + + +@pytest.mark.asyncio +async def test_suggest_labels_refactor(): + """Test label suggestion for refactor context""" + labels = ['Type/Refactor', 'Priority/Medium', 'Complexity/Medium', 'Component/Backend'] + tools = _create_tools_with_labels(labels) + + context = "Refactor architecture to extract service layer" + suggestions = await tools.suggest_labels(context) + + assert 'Type/Refactor' in suggestions + assert 'Component/Backend' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_documentation(): + """Test label suggestion for documentation context""" + labels = ['Type/Documentation', 'Priority/Medium', 'Complexity/Medium', 'Component/API', 'Component/Docs'] + tools = _create_tools_with_labels(labels) + + context = "Update documentation for API endpoints" + suggestions = await tools.suggest_labels(context) + + assert 'Type/Documentation' in suggestions + assert 'Component/API' in suggestions or 'Component/Docs' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_priority(): + """Test priority detection in suggestions""" + labels = ['Type/Feature', 'Priority/Critical', 'Priority/High', 'Priority/Medium', 'Priority/Low', 'Complexity/Medium'] + tools = _create_tools_with_labels(labels) + + # Critical priority + context = "Urgent blocker in production" + suggestions = await tools.suggest_labels(context) + assert 'Priority/Critical' in suggestions + + # High priority + context = "Important feature needed asap" + suggestions = await tools.suggest_labels(context) + assert 'Priority/High' in suggestions + + # Low priority + context = "Nice-to-have optional improvement" + suggestions = await tools.suggest_labels(context) + assert 'Priority/Low' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_complexity(): + """Test complexity detection in suggestions""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Simple', 'Complexity/Medium', 'Complexity/Complex'] + tools = _create_tools_with_labels(labels) + + # Simple complexity + context = "Simple quick fix for typo" + suggestions = await tools.suggest_labels(context) + assert 'Complexity/Simple' in suggestions + + # Complex complexity + context = "Complex challenging architecture redesign" + suggestions = await tools.suggest_labels(context) + assert 'Complexity/Complex' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_efforts(): + """Test efforts detection in suggestions""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Efforts/XS', 'Efforts/S', 'Efforts/M', 'Efforts/L', 'Efforts/XL'] + tools = _create_tools_with_labels(labels) + + # XS effort + context = "Tiny fix that takes 1 hour" + suggestions = await tools.suggest_labels(context) + assert 'Efforts/XS' in suggestions + + # L effort + context = "Large feature taking 1 week" + suggestions = await tools.suggest_labels(context) + assert 'Efforts/L' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_components(): + """Test component detection in suggestions""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Component/Backend', 'Component/Frontend', 'Component/API', 'Component/Database'] + tools = _create_tools_with_labels(labels) + + # Backend component + context = "Update backend API service" + suggestions = await tools.suggest_labels(context) + assert 'Component/Backend' in suggestions + assert 'Component/API' in suggestions + + # Frontend component + context = "Fix frontend UI component" + suggestions = await tools.suggest_labels(context) + assert 'Component/Frontend' in suggestions + + # Database component + context = "Add database migration for schema" + suggestions = await tools.suggest_labels(context) + assert 'Component/Database' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_tech_stack(): + """Test tech stack detection in suggestions""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Tech/Python', 'Tech/FastAPI', 'Tech/Docker', 'Tech/PostgreSQL'] + tools = _create_tools_with_labels(labels) + + # Python + context = "Update Python FastAPI endpoint" + suggestions = await tools.suggest_labels(context) + assert 'Tech/Python' in suggestions + assert 'Tech/FastAPI' in suggestions + + # Docker + context = "Fix Dockerfile configuration" + suggestions = await tools.suggest_labels(context) + assert 'Tech/Docker' in suggestions + + # PostgreSQL + context = "Optimize PostgreSQL query" + suggestions = await tools.suggest_labels(context) + assert 'Tech/PostgreSQL' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_source(): + """Test source detection in suggestions""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Source/Development', 'Source/Staging', 'Source/Production'] + tools = _create_tools_with_labels(labels) + + # Development + context = "Issue found in development environment" + suggestions = await tools.suggest_labels(context) + assert 'Source/Development' in suggestions + + # Production + context = "Critical production issue" + suggestions = await tools.suggest_labels(context) + assert 'Source/Production' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_risk(): + """Test risk detection in suggestions""" + labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Risk/High', 'Risk/Low'] + tools = _create_tools_with_labels(labels) + + # High risk + context = "Breaking change to major API" + suggestions = await tools.suggest_labels(context) + assert 'Risk/High' in suggestions + + # Low risk + context = "Safe minor update with low risk" + suggestions = await tools.suggest_labels(context) + assert 'Risk/Low' in suggestions + + +@pytest.mark.asyncio +async def test_suggest_labels_multiple_categories(): + """Test that suggestions span multiple categories""" + labels = [ + 'Type/Bug', 'Type/Feature', + 'Priority/Critical', 'Priority/Medium', + 'Complexity/Complex', 'Complexity/Medium', + 'Component/Backend', 'Component/API', 'Component/Auth', + 'Tech/FastAPI', 'Tech/PostgreSQL', + 'Source/Production' + ] + tools = _create_tools_with_labels(labels) + + context = """ + Urgent critical bug in production backend API service. + Need to fix broken authentication endpoint. + This is a complex issue requiring FastAPI and PostgreSQL expertise. + """ + + suggestions = await tools.suggest_labels(context) + + # Should have Type + assert any('Type/' in label for label in suggestions) + + # Should have Priority + assert any('Priority/' in label for label in suggestions) + + # Should have Component + assert any('Component/' in label for label in suggestions) + + # Should have Tech + assert any('Tech/' in label for label in suggestions) + + # Should have Source + assert any('Source/' in label for label in suggestions) + + +@pytest.mark.asyncio +async def test_suggest_labels_empty_repo(): + """Test suggestions when no repo specified and no labels available""" + mock_client = Mock() + mock_client.repo = None + tools = LabelTools(mock_client) + + context = "Fix a bug" + suggestions = await tools.suggest_labels(context) + + # Should return empty list when no repo + assert suggestions == [] + + +@pytest.mark.asyncio +async def test_suggest_labels_no_matching_labels(): + """Test suggestions return empty when no matching labels exist""" + labels = ['Custom/Label', 'Other/Thing'] # No standard labels + tools = _create_tools_with_labels(labels) + + context = "Fix a bug" + suggestions = await tools.suggest_labels(context) + + # Should return empty list since no Type/Bug or similar exists + assert len(suggestions) == 0 + + +@pytest.mark.asyncio +async def test_get_labels_org_owned_repo(): + """Test getting labels for organization-owned repository""" + mock_client = Mock() + mock_client.repo = 'myorg/myrepo' + mock_client.is_org_repo = Mock(return_value=True) + mock_client.get_org_labels = Mock(return_value=[ + {'name': 'Type/Bug', 'id': 1}, + {'name': 'Type/Feature', 'id': 2} + ]) + mock_client.get_labels = Mock(return_value=[ + {'name': 'Component/Backend', 'id': 3} + ]) + + tools = LabelTools(mock_client) + result = await tools.get_labels() + + # Should fetch both org and repo labels + mock_client.is_org_repo.assert_called_once_with('myorg/myrepo') + mock_client.get_org_labels.assert_called_once_with('myorg') + mock_client.get_labels.assert_called_once_with('myorg/myrepo') + + assert len(result['organization']) == 2 + assert len(result['repository']) == 1 + assert result['total_count'] == 3 + + +@pytest.mark.asyncio +async def test_get_labels_user_owned_repo(): + """Test getting labels for user-owned repository (no org labels)""" + mock_client = Mock() + mock_client.repo = 'lmiranda/personal-portfolio' + mock_client.is_org_repo = Mock(return_value=False) + mock_client.get_labels = Mock(return_value=[ + {'name': 'bug', 'id': 1}, + {'name': 'enhancement', 'id': 2} + ]) + + tools = LabelTools(mock_client) + result = await tools.get_labels() + + # Should check if org repo + mock_client.is_org_repo.assert_called_once_with('lmiranda/personal-portfolio') + + # Should NOT call get_org_labels for user-owned repos + mock_client.get_org_labels.assert_not_called() + + # Should still get repo labels + mock_client.get_labels.assert_called_once_with('lmiranda/personal-portfolio') + + assert len(result['organization']) == 0 + assert len(result['repository']) == 2 + assert result['total_count'] == 2