feat: scaffold gitea-mcp package with module rename

- Copied source from leo-claude-mktplace/mcp-servers/gitea/ v1.0.0
- Renamed module: mcp_server → gitea_mcp (all imports updated)
- Created pyproject.toml for standalone package identity
- Preserved all existing tools and test suite
- MCP SDK imports unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 15:41:46 -05:00
commit 2b32387864
20 changed files with 5074 additions and 0 deletions

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
__pycache__/
*.pyc
*.pyo
*.egg-info/
dist/
build/
.venv/
.env
*.egg
.pytest_cache/
.coverage
htmlcov/

23
gitea_mcp/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
"""
gitea-mcp: MCP server for Gitea integration.
Provides 39 MCP tools for issue management, PRs, wikis, milestones,
dependencies, labels, and merge operations via Gitea API.
Public API for external consumers:
from gitea_mcp import get_tool_definitions, create_tool_dispatcher, GiteaClient, GiteaConfig
"""
__version__ = "1.0.0"
from .tool_registry import get_tool_definitions, create_tool_dispatcher
from .gitea_client import GiteaClient
from .config import GiteaConfig
__all__ = [
"__version__",
"get_tool_definitions",
"create_tool_dispatcher",
"GiteaClient",
"GiteaConfig",
]

227
gitea_mcp/config.py Normal file
View File

@@ -0,0 +1,227 @@
"""
Configuration loader for Gitea MCP Server.
Implements hybrid configuration system:
- System-level: ~/.config/claude/gitea.env (credentials)
- Project-level: .env (repository specification)
- Auto-detection: Falls back to git remote URL parsing
"""
from pathlib import Path
from dotenv import load_dotenv
import os
import re
import subprocess
import logging
from typing import Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GiteaConfig:
"""Hybrid configuration loader with mode detection"""
def __init__(self):
self.api_url: Optional[str] = None
self.api_token: Optional[str] = None
self.repo: Optional[str] = None
self.mode: str = 'project'
def load(self) -> Dict[str, Optional[str]]:
"""
Load configuration from system and project levels.
Project-level configuration overrides system-level.
Returns:
Dict containing api_url, api_token, repo, mode
Raises:
FileNotFoundError: If system config is missing
ValueError: If required configuration is missing
"""
# Load system config
system_config = Path.home() / '.config' / 'claude' / 'gitea.env'
if system_config.exists():
load_dotenv(system_config)
logger.info(f"Loaded system configuration from {system_config}")
else:
raise FileNotFoundError(
f"System config not found: {system_config}\n"
"Create it with: mkdir -p ~/.config/claude && "
"cat > ~/.config/claude/gitea.env"
)
# Find project directory (MCP server cwd is plugin dir, not project dir)
project_dir = self._find_project_directory()
# Load project config (overrides system)
if project_dir:
project_config = project_dir / '.env'
if project_config.exists():
load_dotenv(project_config, override=True)
logger.info(f"Loaded project configuration from {project_config}")
# Extract values
self.api_url = os.getenv('GITEA_API_URL')
self.api_token = os.getenv('GITEA_API_TOKEN')
self.repo = os.getenv('GITEA_REPO') # Optional, must be owner/repo format
# Auto-detect repo from git remote if not specified
if not self.repo and project_dir:
self.repo = self._detect_repo_from_git(project_dir)
if self.repo:
logger.info(f"Auto-detected repository from git remote: {self.repo}")
# Detect mode
if self.repo:
self.mode = 'project'
logger.info(f"Running in project mode: {self.repo}")
else:
self.mode = 'company'
logger.info("Running in company-wide mode (PMO)")
# Validate required variables
self._validate()
return {
'api_url': self.api_url,
'api_token': self.api_token,
'repo': self.repo,
'mode': self.mode
}
def _validate(self) -> None:
"""
Validate that required configuration is present.
Raises:
ValueError: If required configuration is missing
"""
required = {
'GITEA_API_URL': self.api_url,
'GITEA_API_TOKEN': self.api_token
}
missing = [key for key, value in required.items() if not value]
if missing:
raise ValueError(
f"Missing required configuration: {', '.join(missing)}\n"
"Check your ~/.config/claude/gitea.env file"
)
def _find_project_directory(self) -> Optional[Path]:
"""
Find the user's project directory.
The MCP server runs with cwd set to the plugin directory, not the
user's project. We need to find the actual project directory using
various heuristics.
Returns:
Path to project directory, or None if not found
"""
# Strategy 1: Check CLAUDE_PROJECT_DIR environment variable
project_dir = os.getenv('CLAUDE_PROJECT_DIR')
if project_dir:
path = Path(project_dir)
if path.exists():
logger.info(f"Found project directory from CLAUDE_PROJECT_DIR: {path}")
return path
# Strategy 2: Check PWD (original working directory before cwd override)
pwd = os.getenv('PWD')
if pwd:
path = Path(pwd)
# Verify it has .git or .env (indicates a project)
if path.exists() and ((path / '.git').exists() or (path / '.env').exists()):
logger.info(f"Found project directory from PWD: {path}")
return path
# Strategy 3: Check current working directory
# This handles test scenarios and cases where cwd is actually the project
cwd = Path.cwd()
if (cwd / '.git').exists() or (cwd / '.env').exists():
logger.info(f"Found project directory from cwd: {cwd}")
return cwd
# Strategy 4: Check if GITEA_REPO is already set (user configured it)
# If so, we don't need to find the project directory for git detection
if os.getenv('GITEA_REPO'):
logger.debug("GITEA_REPO already set, skipping project directory detection")
return None
logger.debug("Could not determine project directory")
return None
def _detect_repo_from_git(self, project_dir: Optional[Path] = None) -> Optional[str]:
"""
Auto-detect repository from git remote origin URL.
Args:
project_dir: Directory to run git command from (defaults to cwd)
Supports URL formats:
- SSH: ssh://git@host:port/owner/repo.git
- SSH short: git@host:owner/repo.git
- HTTPS: https://host/owner/repo.git
- HTTP: http://host/owner/repo.git
Returns:
Repository in 'owner/repo' format, or None if detection fails
"""
try:
result = subprocess.run(
['git', 'remote', 'get-url', 'origin'],
capture_output=True,
text=True,
timeout=5,
cwd=str(project_dir) if project_dir else None
)
if result.returncode != 0:
logger.debug("No git remote 'origin' found")
return None
url = result.stdout.strip()
return self._parse_git_url(url)
except subprocess.TimeoutExpired:
logger.warning("Git command timed out")
return None
except FileNotFoundError:
logger.debug("Git not available")
return None
except Exception as e:
logger.debug(f"Failed to detect repo from git: {e}")
return None
def _parse_git_url(self, url: str) -> Optional[str]:
"""
Parse git URL to extract owner/repo.
Args:
url: Git remote URL
Returns:
Repository in 'owner/repo' format, or None if parsing fails
"""
# Remove .git suffix if present
url = re.sub(r'\.git$', '', url)
# SSH format: ssh://git@host:port/owner/repo
ssh_match = re.match(r'ssh://[^/]+/(.+/.+)$', url)
if ssh_match:
return ssh_match.group(1)
# SSH short format: git@host:owner/repo
ssh_short_match = re.match(r'git@[^:]+:(.+/.+)$', url)
if ssh_short_match:
return ssh_short_match.group(1)
# HTTPS/HTTP format: https://host/owner/repo
http_match = re.match(r'https?://[^/]+/(.+/.+)$', url)
if http_match:
return http_match.group(1)
logger.warning(f"Could not parse git URL: {url}")
return None

849
gitea_mcp/gitea_client.py Normal file
View File

@@ -0,0 +1,849 @@
"""
Gitea API client for interacting with Gitea API.
Provides synchronous methods for:
- Issue CRUD operations
- Label management
- Repository operations
- PMO multi-repo aggregation
- Wiki operations (lessons learned)
- Milestone management
- Issue dependencies
"""
import requests
import logging
import re
from typing import List, Dict, Optional
from .config import GiteaConfig
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GiteaClient:
"""Client for interacting with Gitea API"""
def __init__(self):
"""Initialize Gitea client with configuration"""
config = GiteaConfig()
config_dict = config.load()
self.base_url = config_dict['api_url']
self.token = config_dict['api_token']
self.repo = config_dict.get('repo') # Optional default repo in owner/repo format
self.mode = config_dict['mode']
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
})
logger.info(f"Gitea client initialized in {self.mode} mode")
def _parse_repo(self, repo: Optional[str] = None) -> tuple:
"""Parse owner/repo from input. Always requires 'owner/repo' format."""
target = repo or self.repo
if not target or '/' not in target:
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
parts = target.split('/', 1)
return parts[0], parts[1]
def list_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None,
milestone: Optional[str] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List issues from Gitea repository.
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
milestone: Filter by milestone title (exact match)
repo: Repository in 'owner/repo' format
Returns:
List of issue dictionaries
"""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues"
params = {'state': state}
if labels:
params['labels'] = ','.join(labels)
if milestone:
params['milestones'] = milestone
logger.info(f"Listing issues from {owner}/{target_repo} with state={state}")
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def get_issue(
self,
issue_number: int,
repo: Optional[str] = None
) -> Dict:
"""Get specific issue details."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}"
logger.info(f"Getting issue #{issue_number} from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_issue(
self,
title: str,
body: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""Create a new issue in Gitea."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues"
data = {'title': title, 'body': body}
if labels:
label_ids = self._resolve_label_ids(labels, owner, target_repo)
data['labels'] = label_ids
logger.info(f"Creating issue in {owner}/{target_repo}: {title}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def _resolve_label_ids(self, label_names: List[str], owner: str, repo: str) -> List[int]:
"""Convert label names to label IDs."""
full_repo = f"{owner}/{repo}"
# Only fetch org labels if repo belongs to an organization
org_labels = []
if self.is_org_repo(full_repo):
org_labels = self.get_org_labels(owner)
repo_labels = self.get_labels(full_repo)
all_labels = org_labels + repo_labels
label_map = {label['name']: label['id'] for label in all_labels}
label_ids = []
for name in label_names:
if name in label_map:
label_ids.append(label_map[name])
else:
logger.warning(f"Label '{name}' not found, skipping")
return label_ids
def update_issue(
self,
issue_number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[List[str]] = None,
milestone: Optional[int] = None,
repo: Optional[str] = None
) -> Dict:
"""
Update existing issue.
Args:
issue_number: Issue number to update
title: New title (optional)
body: New body (optional)
state: New state - 'open' or 'closed' (optional)
labels: New labels (optional)
milestone: Milestone ID to assign (optional)
repo: Repository in 'owner/repo' format
Returns:
Updated issue dictionary
"""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}"
data = {}
if title is not None:
data['title'] = title
if body is not None:
data['body'] = body
if state is not None:
data['state'] = state
if labels is not None:
data['labels'] = labels
if milestone is not None:
data['milestone'] = milestone
logger.info(f"Updating issue #{issue_number} in {owner}/{target_repo}")
response = self.session.patch(url, json=data)
response.raise_for_status()
return response.json()
def add_comment(
self,
issue_number: int,
comment: str,
repo: Optional[str] = None
) -> Dict:
"""Add comment to issue. Repo must be 'owner/repo' format."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/comments"
data = {'body': comment}
logger.info(f"Adding comment to issue #{issue_number} in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def get_labels(self, repo: Optional[str] = None) -> List[Dict]:
"""Get all labels from repository. Repo must be 'owner/repo' format."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/labels"
logger.info(f"Getting labels from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def get_org_labels(self, org: str) -> List[Dict]:
"""Get organization-level labels. Org is the organization name."""
url = f"{self.base_url}/orgs/{org}/labels"
logger.info(f"Getting organization labels for {org}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def list_repos(self, org: str) -> List[Dict]:
"""List all repositories in organization. Org is the organization name."""
url = f"{self.base_url}/orgs/{org}/repos"
logger.info(f"Listing all repositories for organization {org}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def aggregate_issues(
self,
org: str,
state: str = 'open',
labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]:
"""Fetch issues across all repositories in org."""
repos = self.list_repos(org)
aggregated = {}
logger.info(f"Aggregating issues across {len(repos)} repositories")
for repo in repos:
repo_name = repo['name']
try:
issues = self.list_issues(
state=state,
labels=labels,
repo=f"{org}/{repo_name}"
)
if issues:
aggregated[repo_name] = issues
logger.info(f"Found {len(issues)} issues in {repo_name}")
except Exception as e:
logger.error(f"Error fetching issues from {repo_name}: {e}")
return aggregated
# ========================================
# WIKI OPERATIONS (Lessons Learned)
# ========================================
def list_wiki_pages(self, repo: Optional[str] = None) -> List[Dict]:
"""List all wiki pages in repository."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/pages"
logger.info(f"Listing wiki pages from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def get_wiki_page(
self,
page_name: str,
repo: Optional[str] = None
) -> Dict:
"""Get a specific wiki page by name."""
from urllib.parse import quote
owner, target_repo = self._parse_repo(repo)
# URL-encode the page_name to handle special characters like ':'
encoded_page_name = quote(page_name, safe='')
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}"
logger.info(f"Getting wiki page '{page_name}' from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_wiki_page(
self,
title: str,
content: str,
repo: Optional[str] = None
) -> Dict:
"""Create a new wiki page."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/new"
data = {
'title': title,
'content_base64': self._encode_base64(content)
}
logger.info(f"Creating wiki page '{title}' in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def update_wiki_page(
self,
page_name: str,
content: str,
repo: Optional[str] = None
) -> Dict:
"""Update an existing wiki page."""
from urllib.parse import quote
owner, target_repo = self._parse_repo(repo)
# URL-encode the page_name to handle special characters like ':'
encoded_page_name = quote(page_name, safe='')
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}"
data = {
'title': page_name, # CRITICAL: include title to preserve page name
'content_base64': self._encode_base64(content)
}
logger.info(f"Updating wiki page '{page_name}' in {owner}/{target_repo}")
response = self.session.patch(url, json=data)
response.raise_for_status()
return response.json()
def delete_wiki_page(
self,
page_name: str,
repo: Optional[str] = None
) -> bool:
"""Delete a wiki page."""
from urllib.parse import quote
owner, target_repo = self._parse_repo(repo)
# URL-encode the page_name to handle special characters like ':'
encoded_page_name = quote(page_name, safe='')
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}"
logger.info(f"Deleting wiki page '{page_name}' from {owner}/{target_repo}")
response = self.session.delete(url)
response.raise_for_status()
return True
def _encode_base64(self, content: str) -> str:
"""Encode content to base64 for wiki API."""
import base64
return base64.b64encode(content.encode('utf-8')).decode('utf-8')
def _decode_base64(self, content: str) -> str:
"""Decode base64 content from wiki API."""
import base64
return base64.b64decode(content.encode('utf-8')).decode('utf-8')
def search_wiki_pages(
self,
query: str,
repo: Optional[str] = None
) -> List[Dict]:
"""Search wiki pages by content (client-side filtering)."""
pages = self.list_wiki_pages(repo)
results = []
query_lower = query.lower()
for page in pages:
if query_lower in page.get('title', '').lower():
results.append(page)
return results
def create_lesson(
self,
title: str,
content: str,
tags: List[str],
category: str = "sprints",
repo: Optional[str] = None
) -> Dict:
"""Create a lessons learned entry in the wiki."""
# Sanitize title for wiki page name
page_name = f"lessons/{category}/{self._sanitize_page_name(title)}"
# Add tags as metadata at the end of content
full_content = f"{content}\n\n---\n**Tags:** {', '.join(tags)}"
return self.create_wiki_page(page_name, full_content, repo)
def search_lessons(
self,
query: Optional[str] = None,
tags: Optional[List[str]] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""Search lessons learned by query and/or tags."""
pages = self.list_wiki_pages(repo)
results = []
for page in pages:
title = page.get('title', '')
# Filter to only lessons (pages starting with lessons/)
if not title.startswith('lessons/'):
continue
# If query provided, check if it matches title
if query:
if query.lower() not in title.lower():
continue
# Get full page content for tag matching if tags provided
if tags:
try:
full_page = self.get_wiki_page(title, repo)
content = self._decode_base64(full_page.get('content_base64', ''))
# Check if any tag is in the content
if not any(tag.lower() in content.lower() for tag in tags):
continue
except Exception:
continue
results.append(page)
return results
def _sanitize_page_name(self, title: str) -> str:
"""Convert title to valid wiki page name."""
# Replace spaces with hyphens, remove special chars
name = re.sub(r'[^\w\s-]', '', title)
name = re.sub(r'[\s]+', '-', name)
return name.lower()
# ========================================
# MILESTONE OPERATIONS
# ========================================
def list_milestones(
self,
state: str = 'open',
repo: Optional[str] = None
) -> List[Dict]:
"""List all milestones in repository."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones"
params = {'state': state}
logger.info(f"Listing milestones from {owner}/{target_repo}")
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def get_milestone(
self,
milestone_id: int,
repo: Optional[str] = None
) -> Dict:
"""Get a specific milestone by ID."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}"
logger.info(f"Getting milestone #{milestone_id} from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_milestone(
self,
title: str,
description: Optional[str] = None,
due_on: Optional[str] = None,
repo: Optional[str] = None
) -> Dict:
"""Create a new milestone."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones"
data = {'title': title}
if description:
data['description'] = description
if due_on:
data['due_on'] = due_on
logger.info(f"Creating milestone '{title}' in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def update_milestone(
self,
milestone_id: int,
title: Optional[str] = None,
description: Optional[str] = None,
state: Optional[str] = None,
due_on: Optional[str] = None,
repo: Optional[str] = None
) -> Dict:
"""Update an existing milestone."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}"
data = {}
if title is not None:
data['title'] = title
if description is not None:
data['description'] = description
if state is not None:
data['state'] = state
if due_on is not None:
data['due_on'] = due_on
logger.info(f"Updating milestone #{milestone_id} in {owner}/{target_repo}")
response = self.session.patch(url, json=data)
response.raise_for_status()
return response.json()
def delete_milestone(
self,
milestone_id: int,
repo: Optional[str] = None
) -> bool:
"""Delete a milestone."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}"
logger.info(f"Deleting milestone #{milestone_id} from {owner}/{target_repo}")
response = self.session.delete(url)
response.raise_for_status()
return True
# ========================================
# ISSUE DEPENDENCY OPERATIONS
# ========================================
def list_issue_dependencies(
self,
issue_number: int,
repo: Optional[str] = None
) -> List[Dict]:
"""List all dependencies for an issue (issues that block this one)."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies"
logger.info(f"Listing dependencies for issue #{issue_number} in {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_issue_dependency(
self,
issue_number: int,
depends_on: int,
repo: Optional[str] = None
) -> Dict:
"""Create a dependency (issue_number depends on depends_on)."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies"
data = {
'dependentIssue': {
'owner': owner,
'repo': target_repo,
'index': depends_on
}
}
logger.info(f"Creating dependency: #{issue_number} depends on #{depends_on} in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def remove_issue_dependency(
self,
issue_number: int,
depends_on: int,
repo: Optional[str] = None
) -> bool:
"""Remove a dependency between issues."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies"
data = {
'dependentIssue': {
'owner': owner,
'repo': target_repo,
'index': depends_on
}
}
logger.info(f"Removing dependency: #{issue_number} no longer depends on #{depends_on}")
response = self.session.delete(url, json=data)
response.raise_for_status()
return True
def list_issue_blocks(
self,
issue_number: int,
repo: Optional[str] = None
) -> List[Dict]:
"""List all issues that this issue blocks."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/blocks"
logger.info(f"Listing issues blocked by #{issue_number} in {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
# ========================================
# REPOSITORY VALIDATION
# ========================================
def get_repo_info(self, repo: Optional[str] = None) -> Dict:
"""Get repository information including owner type."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}"
logger.info(f"Getting repo info for {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def is_org_repo(self, repo: Optional[str] = None) -> bool:
"""
Check if repository belongs to an organization (not a user).
Uses the /orgs/{owner} endpoint to reliably detect organizations,
as the owner.type field in repo info may be null in some Gitea versions.
"""
owner, _ = self._parse_repo(repo)
return self._is_organization(owner)
def _is_organization(self, owner: str) -> bool:
"""
Check if an owner is an organization by querying the orgs endpoint.
Args:
owner: The owner name to check
Returns:
True if owner is an organization, False if user or unknown
"""
url = f"{self.base_url}/orgs/{owner}"
try:
response = self.session.get(url)
# 200 = organization exists, 404 = not an organization (user account)
return response.status_code == 200
except Exception as e:
logger.warning(f"Failed to check if {owner} is organization: {e}")
return False
def get_branch_protection(
self,
branch: str,
repo: Optional[str] = None
) -> Optional[Dict]:
"""Get branch protection rules for a branch."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/branch_protections/{branch}"
logger.info(f"Getting branch protection for {branch} in {owner}/{target_repo}")
try:
response = self.session.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None # No protection rules
raise
def create_label(
self,
name: str,
color: str,
description: Optional[str] = None,
repo: Optional[str] = None
) -> Dict:
"""Create a new label in the repository."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/labels"
data = {
'name': name,
'color': color.lstrip('#') # Remove # if present
}
if description:
data['description'] = description
logger.info(f"Creating label '{name}' in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def create_org_label(
self,
org: str,
name: str,
color: str,
description: Optional[str] = None
) -> Dict:
"""
Create a new label at the organization level.
Organization labels are shared across all repositories in the org.
Use this for workflow labels (Type, Priority, Complexity, Effort, etc.)
Args:
org: Organization name
name: Label name (e.g., 'Type/Bug', 'Priority/High')
color: Hex color code (with or without #)
description: Optional label description
Returns:
Created label dictionary
"""
url = f"{self.base_url}/orgs/{org}/labels"
data = {
'name': name,
'color': color.lstrip('#') # Remove # if present
}
if description:
data['description'] = description
logger.info(f"Creating organization label '{name}' in {org}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
# ========================================
# PULL REQUEST OPERATIONS
# ========================================
def list_pull_requests(
self,
state: str = 'open',
sort: str = 'recentupdate',
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List pull requests from Gitea repository.
Args:
state: PR state (open, closed, all)
sort: Sort order (oldest, recentupdate, leastupdate, mostcomment, leastcomment, priority)
labels: Filter by labels
repo: Repository in 'owner/repo' format
Returns:
List of pull request dictionaries
"""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls"
params = {'state': state, 'sort': sort}
if labels:
params['labels'] = ','.join(labels)
logger.info(f"Listing PRs from {owner}/{target_repo} with state={state}")
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def get_pull_request(
self,
pr_number: int,
repo: Optional[str] = None
) -> Dict:
"""Get specific pull request details."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}"
logger.info(f"Getting PR #{pr_number} from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def get_pr_diff(
self,
pr_number: int,
repo: Optional[str] = None
) -> str:
"""Get the diff for a pull request."""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}.diff"
logger.info(f"Getting diff for PR #{pr_number} from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.text
def get_pr_comments(
self,
pr_number: int,
repo: Optional[str] = None
) -> List[Dict]:
"""Get comments on a pull request (uses issue comments endpoint)."""
owner, target_repo = self._parse_repo(repo)
# PRs share comment endpoint with issues in Gitea
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{pr_number}/comments"
logger.info(f"Getting comments for PR #{pr_number} from {owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_pr_review(
self,
pr_number: int,
body: str,
event: str = 'COMMENT',
comments: Optional[List[Dict]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a review on a pull request.
Args:
pr_number: Pull request number
body: Review body/summary
event: Review action (APPROVE, REQUEST_CHANGES, COMMENT)
comments: Optional list of inline comments with path, position, body
repo: Repository in 'owner/repo' format
Returns:
Created review dictionary
"""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}/reviews"
data = {
'body': body,
'event': event
}
if comments:
data['comments'] = comments
logger.info(f"Creating review on PR #{pr_number} in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def add_pr_comment(
self,
pr_number: int,
body: str,
repo: Optional[str] = None
) -> Dict:
"""Add a general comment to a pull request (uses issue comment endpoint)."""
owner, target_repo = self._parse_repo(repo)
# PRs share comment endpoint with issues in Gitea
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{pr_number}/comments"
data = {'body': body}
logger.info(f"Adding comment to PR #{pr_number} in {owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def create_pull_request(
self,
title: str,
body: str,
head: str,
base: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a new pull request.
Args:
title: PR title
body: PR description/body
head: Source branch name (the branch with changes)
base: Target branch name (the branch to merge into)
labels: Optional list of label names
repo: Repository in 'owner/repo' format
Returns:
Created pull request dictionary
"""
owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls"
data = {
'title': title,
'body': body,
'head': head,
'base': base
}
if labels:
label_ids = self._resolve_label_ids(labels, owner, target_repo)
data['labels'] = label_ids
logger.info(f"Creating PR '{title}' in {owner}/{target_repo}: {head} -> {base}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()

93
gitea_mcp/server.py Normal file
View File

@@ -0,0 +1,93 @@
"""
MCP Server entry point for Gitea integration.
Provides Gitea tools to Claude Code via JSON-RPC 2.0 over stdio.
"""
import asyncio
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .config import GiteaConfig
from .gitea_client import GiteaClient
from .tool_registry import get_tool_definitions, create_tool_dispatcher
# Suppress noisy MCP validation warnings on stderr
logging.basicConfig(level=logging.INFO)
logging.getLogger("root").setLevel(logging.ERROR)
logging.getLogger("mcp").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
class GiteaMCPServer:
"""MCP Server for Gitea integration"""
def __init__(self):
self.server = Server("gitea-mcp")
self.config = None
self.client = None
self._dispatcher = None
async def initialize(self):
"""
Initialize server and load configuration.
Raises:
Exception: If initialization fails
"""
try:
config_loader = GiteaConfig()
self.config = config_loader.load()
self.client = GiteaClient()
self._dispatcher = create_tool_dispatcher(self.client)
logger.info(f"Gitea MCP Server initialized in {self.config['mode']} mode")
except Exception as e:
logger.error(f"Failed to initialize: {e}")
raise
def setup_tools(self):
"""Register all available tools with the MCP server"""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""Return list of available tools"""
return get_tool_definitions()
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""
Handle tool invocation.
Args:
name: Tool name
arguments: Tool arguments
Returns:
List of TextContent with results
"""
return await self._dispatcher(name, arguments)
async def run(self):
"""Run the MCP server"""
await self.initialize()
self.setup_tools()
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
async def main():
"""Main entry point"""
server = GiteaMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())

1098
gitea_mcp/tool_registry.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,11 @@
"""
MCP tools for Gitea integration.
This package provides MCP tool implementations for:
- Issue operations (issues.py)
- Label management (labels.py)
- Wiki operations (wiki.py)
- Milestone management (milestones.py)
- Issue dependencies (dependencies.py)
- Pull request operations (pull_requests.py)
"""

View File

@@ -0,0 +1,216 @@
"""
Issue dependency management tools for MCP server.
Provides async wrappers for issue dependency operations:
- List/create/remove dependencies
- Build dependency graphs for parallel execution
"""
import asyncio
import logging
from typing import List, Dict, Optional, Set, Tuple
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DependencyTools:
"""Async wrappers for Gitea issue dependency operations"""
def __init__(self, gitea_client):
"""
Initialize dependency tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
async def list_issue_dependencies(
self,
issue_number: int,
repo: Optional[str] = None
) -> List[Dict]:
"""
List all dependencies for an issue (issues that block this one).
Args:
issue_number: Issue number
repo: Repository in owner/repo format
Returns:
List of issues that this issue depends on
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_issue_dependencies(issue_number, repo)
)
async def create_issue_dependency(
self,
issue_number: int,
depends_on: int,
repo: Optional[str] = None
) -> Dict:
"""
Create a dependency between issues.
Args:
issue_number: The issue that will depend on another
depends_on: The issue that blocks issue_number
repo: Repository in owner/repo format
Returns:
Created dependency information
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_issue_dependency(issue_number, depends_on, repo)
)
async def remove_issue_dependency(
self,
issue_number: int,
depends_on: int,
repo: Optional[str] = None
) -> bool:
"""
Remove a dependency between issues.
Args:
issue_number: The issue that currently depends on another
depends_on: The issue being depended on
repo: Repository in owner/repo format
Returns:
True if removed successfully
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.remove_issue_dependency(issue_number, depends_on, repo)
)
async def list_issue_blocks(
self,
issue_number: int,
repo: Optional[str] = None
) -> List[Dict]:
"""
List all issues that this issue blocks.
Args:
issue_number: Issue number
repo: Repository in owner/repo format
Returns:
List of issues blocked by this issue
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_issue_blocks(issue_number, repo)
)
async def build_dependency_graph(
self,
issue_numbers: List[int],
repo: Optional[str] = None
) -> Dict[int, List[int]]:
"""
Build a dependency graph for a list of issues.
Args:
issue_numbers: List of issue numbers to analyze
repo: Repository in owner/repo format
Returns:
Dictionary mapping issue_number -> list of issues it depends on
"""
graph = {}
for issue_num in issue_numbers:
try:
deps = await self.list_issue_dependencies(issue_num, repo)
graph[issue_num] = [
d.get('number') or d.get('index')
for d in deps
if (d.get('number') or d.get('index')) in issue_numbers
]
except Exception as e:
logger.warning(f"Could not fetch dependencies for #{issue_num}: {e}")
graph[issue_num] = []
return graph
async def get_ready_tasks(
self,
issue_numbers: List[int],
completed: Set[int],
repo: Optional[str] = None
) -> List[int]:
"""
Get tasks that are ready to execute (no unresolved dependencies).
Args:
issue_numbers: List of all issue numbers in sprint
completed: Set of already completed issue numbers
repo: Repository in owner/repo format
Returns:
List of issue numbers that can be executed now
"""
graph = await self.build_dependency_graph(issue_numbers, repo)
ready = []
for issue_num in issue_numbers:
if issue_num in completed:
continue
deps = graph.get(issue_num, [])
# Task is ready if all its dependencies are completed
if all(dep in completed for dep in deps):
ready.append(issue_num)
return ready
async def get_execution_order(
self,
issue_numbers: List[int],
repo: Optional[str] = None
) -> List[List[int]]:
"""
Get a parallelizable execution order for issues.
Returns batches of issues that can be executed in parallel.
Each batch contains issues with no unresolved dependencies.
Args:
issue_numbers: List of all issue numbers
repo: Repository in owner/repo format
Returns:
List of batches, where each batch can be executed in parallel
"""
graph = await self.build_dependency_graph(issue_numbers, repo)
completed: Set[int] = set()
remaining = set(issue_numbers)
batches = []
while remaining:
# Find all tasks with no unresolved dependencies
batch = []
for issue_num in remaining:
deps = graph.get(issue_num, [])
if all(dep in completed for dep in deps):
batch.append(issue_num)
if not batch:
# Circular dependency detected
logger.error(f"Circular dependency detected! Remaining: {remaining}")
batch = list(remaining) # Force include remaining to avoid infinite loop
batches.append(batch)
completed.update(batch)
remaining -= set(batch)
return batches

287
gitea_mcp/tools/issues.py Normal file
View File

@@ -0,0 +1,287 @@
"""
Issue management tools for MCP server.
Provides async wrappers for issue CRUD operations with:
- Branch-aware security
- PMO multi-repo support
- Comprehensive error handling
"""
import asyncio
import os
import subprocess
import logging
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IssueTools:
"""Async wrappers for Gitea issue operations with branch detection"""
def __init__(self, gitea_client):
"""
Initialize issue tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
def _get_project_directory(self) -> Optional[str]:
"""
Get the user's project directory from environment.
Returns:
Project directory path or None if not set
"""
return os.environ.get('CLAUDE_PROJECT_DIR')
def _get_current_branch(self) -> str:
"""
Get current git branch from user's project directory.
Uses CLAUDE_PROJECT_DIR environment variable to determine the correct
directory for git operations, avoiding the bug where git runs from
the installed plugin directory instead of the user's project.
Returns:
Current branch name or 'unknown' if not in a git repo
"""
try:
project_dir = self._get_project_directory()
result = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
capture_output=True,
text=True,
check=True,
cwd=project_dir # Run git in project directory, not plugin directory
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def _check_branch_permissions(self, operation: str) -> bool:
"""
Check if operation is allowed on current branch.
Args:
operation: Operation name (list_issues, create_issue, etc.)
Returns:
True if operation is allowed, False otherwise
"""
branch = self._get_current_branch()
# Production branches (read-only except incidents)
if branch in ['main', 'master'] or branch.startswith('prod/'):
return operation in ['list_issues', 'get_issue', 'get_labels']
# Staging branches (read-only for code)
if branch == 'staging' or branch.startswith('stage/'):
return operation in ['list_issues', 'get_issue', 'get_labels', 'create_issue']
# Development branches (full access)
# Include all common feature/fix branch patterns
dev_prefixes = (
'feat/', 'feature/', 'dev/',
'fix/', 'bugfix/', 'hotfix/',
'chore/', 'refactor/', 'docs/', 'test/'
)
if branch in ['development', 'develop'] or branch.startswith(dev_prefixes):
return True
# Unknown branch - be restrictive
return False
async def list_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None,
milestone: Optional[str] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List issues from repository (async wrapper).
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
milestone: Filter by milestone title (exact match)
repo: Override configured repo (for PMO multi-repo)
Returns:
List of issue dictionaries
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('list_issues'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot list issues on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_issues(state, labels, milestone, repo)
)
async def get_issue(
self,
issue_number: int,
repo: Optional[str] = None
) -> Dict:
"""
Get specific issue details (async wrapper).
Args:
issue_number: Issue number
repo: Override configured repo (for PMO multi-repo)
Returns:
Issue dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('get_issue'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot get issue on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_issue(issue_number, repo)
)
async def create_issue(
self,
title: str,
body: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create new issue (async wrapper with branch check).
Args:
title: Issue title
body: Issue description
labels: List of label names
repo: Override configured repo (for PMO multi-repo)
Returns:
Created issue dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('create_issue'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot create issues on branch '{branch}'. "
f"Switch to a development branch to create issues."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_issue(title, body, labels, repo)
)
async def update_issue(
self,
issue_number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[List[str]] = None,
milestone: Optional[int] = None,
repo: Optional[str] = None
) -> Dict:
"""
Update existing issue (async wrapper with branch check).
Args:
issue_number: Issue number
title: New title (optional)
body: New body (optional)
state: New state - 'open' or 'closed' (optional)
labels: New labels (optional)
milestone: Milestone ID to assign (optional)
repo: Override configured repo (for PMO multi-repo)
Returns:
Updated issue dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('update_issue'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot update issues on branch '{branch}'. "
f"Switch to a development branch to update issues."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.update_issue(issue_number, title, body, state, labels, milestone, repo)
)
async def add_comment(
self,
issue_number: int,
comment: str,
repo: Optional[str] = None
) -> Dict:
"""
Add comment to issue (async wrapper with branch check).
Args:
issue_number: Issue number
comment: Comment text
repo: Override configured repo (for PMO multi-repo)
Returns:
Created comment dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('add_comment'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot add comments on branch '{branch}'. "
f"Switch to a development branch to add comments."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.add_comment(issue_number, comment, repo)
)
async def aggregate_issues(
self,
org: str,
state: str = 'open',
labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]:
"""Aggregate issues across all repositories in org."""
if not self._check_branch_permissions('aggregate_issues'):
branch = self._get_current_branch()
raise PermissionError(f"Cannot aggregate issues on branch '{branch}'.")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.aggregate_issues(org, state, labels)
)

377
gitea_mcp/tools/labels.py Normal file
View File

@@ -0,0 +1,377 @@
"""
Label management tools for MCP server.
Provides async wrappers for label operations with:
- Label taxonomy retrieval
- Intelligent label suggestion
- Dynamic label detection
"""
import asyncio
import logging
import re
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LabelTools:
"""Async wrappers for Gitea label operations"""
def __init__(self, gitea_client):
"""
Initialize label tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]:
"""Get all labels (org + repo if org-owned, repo-only if user-owned)."""
loop = asyncio.get_event_loop()
target_repo = repo or self.gitea.repo
if not target_repo or '/' not in target_repo:
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
# Check if repo belongs to an organization or user
is_org = await loop.run_in_executor(
None,
lambda: self.gitea.is_org_repo(target_repo)
)
org_labels = []
if is_org:
org = target_repo.split('/')[0]
org_labels = await loop.run_in_executor(
None,
lambda: self.gitea.get_org_labels(org)
)
repo_labels = await loop.run_in_executor(
None,
lambda: self.gitea.get_labels(target_repo)
)
return {
'organization': org_labels,
'repository': repo_labels,
'total_count': len(org_labels) + len(repo_labels)
}
async def suggest_labels(self, context: str, repo: Optional[str] = None) -> List[str]:
"""
Analyze context and suggest appropriate labels from repository's actual labels.
This method fetches actual labels from the repository and matches them
dynamically, supporting any label naming convention (slash, colon-space, etc.).
Args:
context: Issue title + description or sprint context
repo: Repository in 'owner/repo' format (optional, uses default if not provided)
Returns:
List of suggested label names that exist in the repository
"""
# Fetch actual labels from repository
target_repo = repo or self.gitea.repo
if not target_repo:
logger.warning("No repository specified, returning empty suggestions")
return []
try:
labels_data = await self.get_labels(target_repo)
all_labels = labels_data.get('organization', []) + labels_data.get('repository', [])
label_names = [label['name'] for label in all_labels]
except Exception as e:
logger.warning(f"Failed to fetch labels: {e}. Using fallback suggestions.")
label_names = []
# Build label lookup for dynamic matching
label_lookup = self._build_label_lookup(label_names)
suggested = []
context_lower = context.lower()
# Type detection (exclusive - only one)
type_label = None
if any(word in context_lower for word in ['bug', 'error', 'fix', 'broken', 'crash', 'fail']):
type_label = self._find_label(label_lookup, 'type', 'bug')
elif any(word in context_lower for word in ['refactor', 'extract', 'restructure', 'architecture', 'service extraction']):
type_label = self._find_label(label_lookup, 'type', 'refactor')
elif any(word in context_lower for word in ['feature', 'add', 'implement', 'new', 'create']):
type_label = self._find_label(label_lookup, 'type', 'feature')
elif any(word in context_lower for word in ['docs', 'documentation', 'readme', 'guide']):
type_label = self._find_label(label_lookup, 'type', 'documentation')
elif any(word in context_lower for word in ['test', 'testing', 'spec', 'coverage']):
type_label = self._find_label(label_lookup, 'type', 'test')
elif any(word in context_lower for word in ['chore', 'maintenance', 'update', 'upgrade']):
type_label = self._find_label(label_lookup, 'type', 'chore')
if type_label:
suggested.append(type_label)
# Priority detection
priority_label = None
if any(word in context_lower for word in ['critical', 'urgent', 'blocker', 'blocking', 'emergency']):
priority_label = self._find_label(label_lookup, 'priority', 'critical')
elif any(word in context_lower for word in ['high', 'important', 'asap', 'soon']):
priority_label = self._find_label(label_lookup, 'priority', 'high')
elif any(word in context_lower for word in ['low', 'nice-to-have', 'optional', 'later']):
priority_label = self._find_label(label_lookup, 'priority', 'low')
else:
priority_label = self._find_label(label_lookup, 'priority', 'medium')
if priority_label:
suggested.append(priority_label)
# Complexity detection
complexity_label = None
if any(word in context_lower for word in ['simple', 'trivial', 'easy', 'quick']):
complexity_label = self._find_label(label_lookup, 'complexity', 'simple')
elif any(word in context_lower for word in ['complex', 'difficult', 'challenging', 'intricate']):
complexity_label = self._find_label(label_lookup, 'complexity', 'complex')
else:
complexity_label = self._find_label(label_lookup, 'complexity', 'medium')
if complexity_label:
suggested.append(complexity_label)
# Effort detection (supports both "Effort" and "Efforts" naming)
effort_label = None
if any(word in context_lower for word in ['xs', 'tiny', '1 hour', '2 hours']):
effort_label = self._find_label(label_lookup, 'effort', 'xs')
elif any(word in context_lower for word in ['small', 's ', '1 day', 'half day']):
effort_label = self._find_label(label_lookup, 'effort', 's')
elif any(word in context_lower for word in ['medium', 'm ', '2 days', '3 days']):
effort_label = self._find_label(label_lookup, 'effort', 'm')
elif any(word in context_lower for word in ['large', 'l ', '1 week', '5 days']):
effort_label = self._find_label(label_lookup, 'effort', 'l')
elif any(word in context_lower for word in ['xl', 'extra large', '2 weeks', 'sprint']):
effort_label = self._find_label(label_lookup, 'effort', 'xl')
if effort_label:
suggested.append(effort_label)
# Component detection (based on keywords)
component_mappings = {
'backend': ['backend', 'server', 'api', 'database', 'service'],
'frontend': ['frontend', 'ui', 'interface', 'react', 'vue', 'component'],
'api': ['api', 'endpoint', 'rest', 'graphql', 'route'],
'database': ['database', 'db', 'sql', 'migration', 'schema', 'postgres'],
'auth': ['auth', 'authentication', 'login', 'oauth', 'token', 'session'],
'deploy': ['deploy', 'deployment', 'docker', 'kubernetes', 'ci/cd'],
'testing': ['test', 'testing', 'spec', 'jest', 'pytest', 'coverage'],
'docs': ['docs', 'documentation', 'readme', 'guide', 'wiki']
}
for component, keywords in component_mappings.items():
if any(keyword in context_lower for keyword in keywords):
label = self._find_label(label_lookup, 'component', component)
if label and label not in suggested:
suggested.append(label)
# Tech stack detection
tech_mappings = {
'python': ['python', 'fastapi', 'django', 'flask', 'pytest'],
'javascript': ['javascript', 'js', 'node', 'npm', 'yarn'],
'docker': ['docker', 'dockerfile', 'container', 'compose'],
'postgresql': ['postgres', 'postgresql', 'psql', 'sql'],
'redis': ['redis', 'cache', 'session store'],
'vue': ['vue', 'vuejs', 'nuxt'],
'fastapi': ['fastapi', 'pydantic', 'starlette']
}
for tech, keywords in tech_mappings.items():
if any(keyword in context_lower for keyword in keywords):
label = self._find_label(label_lookup, 'tech', tech)
if label and label not in suggested:
suggested.append(label)
# Source detection (based on git branch or context)
source_label = None
if 'development' in context_lower or 'dev/' in context_lower:
source_label = self._find_label(label_lookup, 'source', 'development')
elif 'staging' in context_lower or 'stage/' in context_lower:
source_label = self._find_label(label_lookup, 'source', 'staging')
elif 'production' in context_lower or 'prod' in context_lower:
source_label = self._find_label(label_lookup, 'source', 'production')
if source_label:
suggested.append(source_label)
# Risk detection
risk_label = None
if any(word in context_lower for word in ['breaking', 'breaking change', 'major', 'risky']):
risk_label = self._find_label(label_lookup, 'risk', 'high')
elif any(word in context_lower for word in ['safe', 'low risk', 'minor']):
risk_label = self._find_label(label_lookup, 'risk', 'low')
if risk_label:
suggested.append(risk_label)
logger.info(f"Suggested {len(suggested)} labels based on context and {len(label_names)} available labels")
return suggested
def _build_label_lookup(self, label_names: List[str]) -> Dict[str, Dict[str, str]]:
"""
Build a lookup dictionary for label matching.
Supports various label formats:
- Slash format: Type/Bug, Priority/High
- Colon-space format: Type: Bug, Priority: High
- Colon format: Type:Bug
Args:
label_names: List of actual label names from repository
Returns:
Nested dict: {category: {value: actual_label_name}}
"""
lookup: Dict[str, Dict[str, str]] = {}
for label in label_names:
# Try different separator patterns
# Pattern: Category<separator>Value
# Separators: /, : , :
match = re.match(r'^([^/:]+)(?:/|:\s*|:)(.+)$', label)
if match:
category = match.group(1).lower().rstrip('s') # Normalize: "Efforts" -> "effort"
value = match.group(2).lower()
if category not in lookup:
lookup[category] = {}
lookup[category][value] = label
return lookup
def _find_label(self, lookup: Dict[str, Dict[str, str]], category: str, value: str) -> Optional[str]:
"""
Find actual label name from lookup.
Args:
lookup: Label lookup dictionary
category: Category to search (e.g., 'type', 'priority')
value: Value to find (e.g., 'bug', 'high')
Returns:
Actual label name if found, None otherwise
"""
category_lower = category.lower().rstrip('s') # Normalize
value_lower = value.lower()
if category_lower in lookup and value_lower in lookup[category_lower]:
return lookup[category_lower][value_lower]
return None
# Organization-level label categories (workflow labels shared across repos)
ORG_LABEL_CATEGORIES = {'agent', 'complexity', 'effort', 'efforts', 'priority', 'risk', 'source', 'type'}
# Repository-level label categories (project-specific labels)
REPO_LABEL_CATEGORIES = {'component', 'tech'}
async def create_label_smart(
self,
name: str,
color: str,
description: Optional[str] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a label at the appropriate level (org or repo) based on category.
Skips if label already exists (checks both org and repo levels).
Organization labels: Agent, Complexity, Effort, Priority, Risk, Source, Type
Repository labels: Component, Tech
Args:
name: Label name (e.g., 'Type/Bug', 'Component/Backend')
color: Hex color code
description: Optional label description
repo: Repository in 'owner/repo' format
Returns:
Created label dictionary with 'level' key, or 'skipped' if already exists
"""
loop = asyncio.get_event_loop()
target_repo = repo or self.gitea.repo
if not target_repo or '/' not in target_repo:
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
owner = target_repo.split('/')[0]
is_org = await loop.run_in_executor(
None,
lambda: self.gitea.is_org_repo(target_repo)
)
# Fetch existing labels to check for duplicates
existing_labels = await self.get_labels(target_repo)
all_existing = existing_labels.get('organization', []) + existing_labels.get('repository', [])
existing_names = [label['name'].lower() for label in all_existing]
# Normalize the new label name for comparison
name_normalized = name.lower()
# Also check for format variations (Type/Bug vs Type: Bug)
name_variations = [name_normalized]
if '/' in name:
name_variations.append(name.replace('/', ': ').lower())
name_variations.append(name.replace('/', ':').lower())
elif ': ' in name:
name_variations.append(name.replace(': ', '/').lower())
elif ':' in name:
name_variations.append(name.replace(':', '/').lower())
# Check if label already exists in any format
for variation in name_variations:
if variation in existing_names:
logger.info(f"Label '{name}' already exists (found as '{variation}'), skipping")
return {
'name': name,
'skipped': True,
'reason': f"Label already exists",
'level': 'existing'
}
# Parse category from label name
category = None
if '/' in name:
category = name.split('/')[0].lower().rstrip('s')
elif ':' in name:
category = name.split(':')[0].strip().lower().rstrip('s')
# If it's an org repo and the category is an org-level category, create at org level
if is_org and category in self.ORG_LABEL_CATEGORIES:
result = await loop.run_in_executor(
None,
lambda: self.gitea.create_org_label(owner, name, color, description)
)
# Handle unexpected response types (API may return list or non-dict)
if not isinstance(result, dict):
logger.error(f"Unexpected API response type for org label: {type(result)} - {result}")
return {
'name': name,
'error': True,
'reason': f"API returned {type(result).__name__} instead of dict: {result}",
'level': 'organization'
}
result['level'] = 'organization'
result['skipped'] = False
logger.info(f"Created organization label '{name}' in {owner}")
else:
# Create at repo level
result = await loop.run_in_executor(
None,
lambda: self.gitea.create_label(name, color, description, target_repo)
)
# Handle unexpected response types (API may return list or non-dict)
if not isinstance(result, dict):
logger.error(f"Unexpected API response type for repo label: {type(result)} - {result}")
return {
'name': name,
'error': True,
'reason': f"API returned {type(result).__name__} instead of dict: {result}",
'level': 'repository'
}
result['level'] = 'repository'
result['skipped'] = False
logger.info(f"Created repository label '{name}' in {target_repo}")
return result

View File

@@ -0,0 +1,145 @@
"""
Milestone management tools for MCP server.
Provides async wrappers for milestone operations:
- CRUD operations for milestones
- Milestone-sprint relationship tracking
"""
import asyncio
import logging
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MilestoneTools:
"""Async wrappers for Gitea milestone operations"""
def __init__(self, gitea_client):
"""
Initialize milestone tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
async def list_milestones(
self,
state: str = 'open',
repo: Optional[str] = None
) -> List[Dict]:
"""
List all milestones in repository.
Args:
state: Milestone state (open, closed, all)
repo: Repository in owner/repo format
Returns:
List of milestone dictionaries
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_milestones(state, repo)
)
async def get_milestone(
self,
milestone_id: int,
repo: Optional[str] = None
) -> Dict:
"""
Get a specific milestone by ID.
Args:
milestone_id: Milestone ID
repo: Repository in owner/repo format
Returns:
Milestone dictionary
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_milestone(milestone_id, repo)
)
async def create_milestone(
self,
title: str,
description: Optional[str] = None,
due_on: Optional[str] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a new milestone.
Args:
title: Milestone title (e.g., "v2.0 Release", "Sprint 17")
description: Milestone description
due_on: Due date in ISO 8601 format (e.g., "2025-02-01T00:00:00Z")
repo: Repository in owner/repo format
Returns:
Created milestone dictionary
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_milestone(title, description, due_on, repo)
)
async def update_milestone(
self,
milestone_id: int,
title: Optional[str] = None,
description: Optional[str] = None,
state: Optional[str] = None,
due_on: Optional[str] = None,
repo: Optional[str] = None
) -> Dict:
"""
Update an existing milestone.
Args:
milestone_id: Milestone ID
title: New title (optional)
description: New description (optional)
state: New state - 'open' or 'closed' (optional)
due_on: New due date (optional)
repo: Repository in owner/repo format
Returns:
Updated milestone dictionary
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.update_milestone(
milestone_id, title, description, state, due_on, repo
)
)
async def delete_milestone(
self,
milestone_id: int,
repo: Optional[str] = None
) -> bool:
"""
Delete a milestone.
Args:
milestone_id: Milestone ID
repo: Repository in owner/repo format
Returns:
True if deleted successfully
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.delete_milestone(milestone_id, repo)
)

View File

@@ -0,0 +1,335 @@
"""
Pull request management tools for MCP server.
Provides async wrappers for PR operations with:
- Branch-aware security
- PMO multi-repo support
- Comprehensive error handling
"""
import asyncio
import os
import subprocess
import logging
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PullRequestTools:
"""Async wrappers for Gitea pull request operations with branch detection"""
def __init__(self, gitea_client):
"""
Initialize pull request tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
def _get_project_directory(self) -> Optional[str]:
"""
Get the user's project directory from environment.
Returns:
Project directory path or None if not set
"""
return os.environ.get('CLAUDE_PROJECT_DIR')
def _get_current_branch(self) -> str:
"""
Get current git branch from user's project directory.
Uses CLAUDE_PROJECT_DIR environment variable to determine the correct
directory for git operations, avoiding the bug where git runs from
the installed plugin directory instead of the user's project.
Returns:
Current branch name or 'unknown' if not in a git repo
"""
try:
project_dir = self._get_project_directory()
result = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
capture_output=True,
text=True,
check=True,
cwd=project_dir # Run git in project directory, not plugin directory
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def _check_branch_permissions(self, operation: str) -> bool:
"""
Check if operation is allowed on current branch.
Args:
operation: Operation name (list_prs, create_review, etc.)
Returns:
True if operation is allowed, False otherwise
"""
branch = self._get_current_branch()
# Read-only operations allowed everywhere
read_ops = ['list_pull_requests', 'get_pull_request', 'get_pr_diff', 'get_pr_comments']
# Production branches (read-only)
if branch in ['main', 'master'] or branch.startswith('prod/'):
return operation in read_ops
# Staging branches (read-only for PRs, can comment)
if branch == 'staging' or branch.startswith('stage/'):
return operation in read_ops + ['add_pr_comment']
# Development branches (full access)
# Include all common feature/fix branch patterns
dev_prefixes = (
'feat/', 'feature/', 'dev/',
'fix/', 'bugfix/', 'hotfix/',
'chore/', 'refactor/', 'docs/', 'test/'
)
if branch in ['development', 'develop'] or branch.startswith(dev_prefixes):
return True
# Unknown branch - be restrictive
return operation in read_ops
async def list_pull_requests(
self,
state: str = 'open',
sort: str = 'recentupdate',
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List pull requests from repository (async wrapper).
Args:
state: PR state (open, closed, all)
sort: Sort order
labels: Filter by labels
repo: Override configured repo (for PMO multi-repo)
Returns:
List of pull request dictionaries
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('list_pull_requests'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot list PRs on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_pull_requests(state, sort, labels, repo)
)
async def get_pull_request(
self,
pr_number: int,
repo: Optional[str] = None
) -> Dict:
"""
Get specific pull request details (async wrapper).
Args:
pr_number: Pull request number
repo: Override configured repo (for PMO multi-repo)
Returns:
Pull request dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('get_pull_request'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot get PR on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_pull_request(pr_number, repo)
)
async def get_pr_diff(
self,
pr_number: int,
repo: Optional[str] = None
) -> str:
"""
Get pull request diff (async wrapper).
Args:
pr_number: Pull request number
repo: Override configured repo (for PMO multi-repo)
Returns:
Diff as string
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('get_pr_diff'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot get PR diff on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_pr_diff(pr_number, repo)
)
async def get_pr_comments(
self,
pr_number: int,
repo: Optional[str] = None
) -> List[Dict]:
"""
Get comments on a pull request (async wrapper).
Args:
pr_number: Pull request number
repo: Override configured repo (for PMO multi-repo)
Returns:
List of comment dictionaries
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('get_pr_comments'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot get PR comments on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_pr_comments(pr_number, repo)
)
async def create_pr_review(
self,
pr_number: int,
body: str,
event: str = 'COMMENT',
comments: Optional[List[Dict]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a review on a pull request (async wrapper with branch check).
Args:
pr_number: Pull request number
body: Review body/summary
event: Review action (APPROVE, REQUEST_CHANGES, COMMENT)
comments: Optional list of inline comments
repo: Override configured repo (for PMO multi-repo)
Returns:
Created review dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('create_pr_review'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot create PR review on branch '{branch}'. "
f"Switch to a development branch to review PRs."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_pr_review(pr_number, body, event, comments, repo)
)
async def add_pr_comment(
self,
pr_number: int,
body: str,
repo: Optional[str] = None
) -> Dict:
"""
Add a general comment to a pull request (async wrapper with branch check).
Args:
pr_number: Pull request number
body: Comment text
repo: Override configured repo (for PMO multi-repo)
Returns:
Created comment dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('add_pr_comment'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot add PR comment on branch '{branch}'. "
f"Switch to a development or staging branch to comment on PRs."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.add_pr_comment(pr_number, body, repo)
)
async def create_pull_request(
self,
title: str,
body: str,
head: str,
base: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a new pull request (async wrapper with branch check).
Args:
title: PR title
body: PR description/body
head: Source branch name (the branch with changes)
base: Target branch name (the branch to merge into)
labels: Optional list of label names
repo: Override configured repo (for PMO multi-repo)
Returns:
Created pull request dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('create_pull_request'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot create PR on branch '{branch}'. "
f"Switch to a development or feature branch to create PRs."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_pull_request(title, body, head, base, labels, repo)
)

187
gitea_mcp/tools/wiki.py Normal file
View File

@@ -0,0 +1,187 @@
"""
Wiki management tools for MCP server.
Provides async wrappers for wiki operations to support lessons learned:
- Page CRUD operations
- Lessons learned creation and search
- RFC number allocation
"""
import asyncio
import logging
import re
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WikiTools:
"""Async wrappers for Gitea wiki operations"""
def __init__(self, gitea_client):
"""
Initialize wiki tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
async def list_wiki_pages(self, repo: Optional[str] = None) -> List[Dict]:
"""List all wiki pages in repository."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_wiki_pages(repo)
)
async def get_wiki_page(
self,
page_name: str,
repo: Optional[str] = None
) -> Dict:
"""Get a specific wiki page by name."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_wiki_page(page_name, repo)
)
async def create_wiki_page(
self,
title: str,
content: str,
repo: Optional[str] = None
) -> Dict:
"""Create a new wiki page."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_wiki_page(title, content, repo)
)
async def update_wiki_page(
self,
page_name: str,
content: str,
repo: Optional[str] = None
) -> Dict:
"""Update an existing wiki page."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.update_wiki_page(page_name, content, repo)
)
async def delete_wiki_page(
self,
page_name: str,
repo: Optional[str] = None
) -> bool:
"""Delete a wiki page."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.delete_wiki_page(page_name, repo)
)
async def search_wiki_pages(
self,
query: str,
repo: Optional[str] = None
) -> List[Dict]:
"""Search wiki pages by title."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.search_wiki_pages(query, repo)
)
async def create_lesson(
self,
title: str,
content: str,
tags: List[str],
category: str = "sprints",
repo: Optional[str] = None
) -> Dict:
"""
Create a lessons learned entry in the wiki.
Args:
title: Lesson title (e.g., "Sprint 16 - Prevent Infinite Loops")
content: Lesson content in markdown
tags: List of tags for categorization
category: Category (sprints, patterns, architecture, etc.)
repo: Repository in owner/repo format
Returns:
Created wiki page
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_lesson(title, content, tags, category, repo)
)
async def search_lessons(
self,
query: Optional[str] = None,
tags: Optional[List[str]] = None,
limit: int = 20,
repo: Optional[str] = None
) -> List[Dict]:
"""
Search lessons learned from previous sprints.
Args:
query: Search query (optional)
tags: Tags to filter by (optional)
limit: Maximum results (default 20)
repo: Repository in owner/repo format
Returns:
List of matching lessons
"""
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
lambda: self.gitea.search_lessons(query, tags, repo)
)
return results[:limit]
async def allocate_rfc_number(self, repo: Optional[str] = None) -> Dict:
"""
Allocate the next available RFC number.
Scans existing wiki pages for RFC-NNNN pattern and returns
the next sequential number.
Args:
repo: Repository in owner/repo format
Returns:
Dict with 'next_number' (int) and 'formatted' (str like 'RFC-0001')
"""
pages = await self.list_wiki_pages(repo)
# Extract RFC numbers from page titles
rfc_numbers = []
rfc_pattern = re.compile(r'^RFC-(\d{4})')
for page in pages:
title = page.get('title', '')
match = rfc_pattern.match(title)
if match:
rfc_numbers.append(int(match.group(1)))
# Calculate next number
if rfc_numbers:
next_num = max(rfc_numbers) + 1
else:
next_num = 1
return {
'next_number': next_num,
'formatted': f'RFC-{next_num:04d}'
}

40
pyproject.toml Normal file
View File

@@ -0,0 +1,40 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "gitea-mcp"
version = "1.0.0"
description = "MCP server for Gitea — issues, PRs, wikis, milestones, merges"
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{ name = "Leo Miranda" }
]
keywords = ["mcp", "gitea", "claude", "tools", "model-context-protocol"]
dependencies = [
"mcp>=0.9.0",
"python-dotenv>=1.0.0",
"requests>=2.31.0",
"pydantic>=2.5.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.3",
"pytest-asyncio>=0.23.0",
"pytest-cov>=4.0.0",
]
[project.scripts]
gitea-mcp = "gitea_mcp.server:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["gitea_mcp*"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
mcp>=0.9.0
python-dotenv>=1.0.0
requests>=2.31.0
pydantic>=2.5.0

0
tests/__init__.py Normal file
View File

259
tests/test_config.py Normal file
View File

@@ -0,0 +1,259 @@
"""
Unit tests for configuration loader.
"""
import pytest
from pathlib import Path
import os
from gitea_mcp.config import GiteaConfig
def test_load_system_config(tmp_path, monkeypatch):
"""Test loading system-level configuration"""
# Mock home directory
config_dir = tmp_path / '.config' / 'claude'
config_dir.mkdir(parents=True)
config_file = config_dir / 'gitea.env'
config_file.write_text(
"GITEA_API_URL=https://test.com/api/v1\n"
"GITEA_API_TOKEN=test_token\n"
"GITEA_OWNER=test_owner\n"
)
monkeypatch.setenv('HOME', str(tmp_path))
monkeypatch.chdir(tmp_path)
config = GiteaConfig()
result = config.load()
assert result['api_url'] == 'https://test.com/api/v1'
assert result['api_token'] == 'test_token'
assert result['mode'] == 'company' # No repo specified
assert result['repo'] is None
def test_project_config_override(tmp_path, monkeypatch):
"""Test that project config overrides system config"""
# Set up system config
system_config_dir = tmp_path / '.config' / 'claude'
system_config_dir.mkdir(parents=True)
system_config = system_config_dir / 'gitea.env'
system_config.write_text(
"GITEA_API_URL=https://test.com/api/v1\n"
"GITEA_API_TOKEN=test_token\n"
"GITEA_OWNER=test_owner\n"
)
# Set up project config
project_dir = tmp_path / 'project'
project_dir.mkdir()
project_config = project_dir / '.env'
project_config.write_text("GITEA_REPO=test_repo\n")
monkeypatch.setenv('HOME', str(tmp_path))
monkeypatch.chdir(project_dir)
config = GiteaConfig()
result = config.load()
assert result['repo'] == 'test_repo'
assert result['mode'] == 'project'
def test_missing_system_config(tmp_path, monkeypatch):
"""Test error handling for missing system configuration"""
monkeypatch.setenv('HOME', str(tmp_path))
monkeypatch.chdir(tmp_path)
with pytest.raises(FileNotFoundError) as exc_info:
config = GiteaConfig()
config.load()
assert "System config not found" in str(exc_info.value)
def test_missing_required_config(tmp_path, monkeypatch):
"""Test error handling for missing required variables"""
# Clear environment variables
for var in ['GITEA_API_URL', 'GITEA_API_TOKEN', 'GITEA_OWNER', 'GITEA_REPO']:
monkeypatch.delenv(var, raising=False)
# Create incomplete config
config_dir = tmp_path / '.config' / 'claude'
config_dir.mkdir(parents=True)
config_file = config_dir / 'gitea.env'
config_file.write_text(
"GITEA_API_URL=https://test.com/api/v1\n"
# Missing GITEA_API_TOKEN and GITEA_OWNER
)
monkeypatch.setenv('HOME', str(tmp_path))
monkeypatch.chdir(tmp_path)
with pytest.raises(ValueError) as exc_info:
config = GiteaConfig()
config.load()
assert "Missing required configuration" in str(exc_info.value)
def test_mode_detection_project(tmp_path, monkeypatch):
"""Test mode detection for project mode"""
config_dir = tmp_path / '.config' / 'claude'
config_dir.mkdir(parents=True)
config_file = config_dir / 'gitea.env'
config_file.write_text(
"GITEA_API_URL=https://test.com/api/v1\n"
"GITEA_API_TOKEN=test_token\n"
"GITEA_OWNER=test_owner\n"
"GITEA_REPO=test_repo\n"
)
monkeypatch.setenv('HOME', str(tmp_path))
monkeypatch.chdir(tmp_path)
config = GiteaConfig()
result = config.load()
assert result['mode'] == 'project'
assert result['repo'] == 'test_repo'
def test_mode_detection_company(tmp_path, monkeypatch):
"""Test mode detection for company mode (PMO)"""
# Clear environment variables, especially GITEA_REPO
for var in ['GITEA_API_URL', 'GITEA_API_TOKEN', 'GITEA_OWNER', 'GITEA_REPO']:
monkeypatch.delenv(var, raising=False)
config_dir = tmp_path / '.config' / 'claude'
config_dir.mkdir(parents=True)
config_file = config_dir / 'gitea.env'
config_file.write_text(
"GITEA_API_URL=https://test.com/api/v1\n"
"GITEA_API_TOKEN=test_token\n"
"GITEA_OWNER=test_owner\n"
# No GITEA_REPO
)
monkeypatch.setenv('HOME', str(tmp_path))
monkeypatch.chdir(tmp_path)
config = GiteaConfig()
result = config.load()
assert result['mode'] == 'company'
assert result['repo'] is None
# ========================================
# GIT URL PARSING TESTS
# ========================================
def test_parse_git_url_ssh_format():
"""Test parsing SSH format git URL"""
config = GiteaConfig()
# SSH with port: ssh://git@host:port/owner/repo.git
url = "ssh://git@hotserv.tailc9b278.ts.net:2222/personal-projects/personal-portfolio.git"
result = config._parse_git_url(url)
assert result == "personal-projects/personal-portfolio"
def test_parse_git_url_ssh_short_format():
"""Test parsing SSH short format git URL"""
config = GiteaConfig()
# SSH short: git@host:owner/repo.git
url = "git@github.com:owner/repo.git"
result = config._parse_git_url(url)
assert result == "owner/repo"
def test_parse_git_url_https_format():
"""Test parsing HTTPS format git URL"""
config = GiteaConfig()
# HTTPS: https://host/owner/repo.git
url = "https://gitea.hotserv.cloud/personal-projects/leo-claude-mktplace.git"
result = config._parse_git_url(url)
assert result == "personal-projects/leo-claude-mktplace"
def test_parse_git_url_http_format():
"""Test parsing HTTP format git URL"""
config = GiteaConfig()
# HTTP: http://host/owner/repo.git
url = "http://gitea.hotserv.cloud/personal-projects/repo.git"
result = config._parse_git_url(url)
assert result == "personal-projects/repo"
def test_parse_git_url_without_git_suffix():
"""Test parsing git URL without .git suffix"""
config = GiteaConfig()
url = "https://github.com/owner/repo"
result = config._parse_git_url(url)
assert result == "owner/repo"
def test_parse_git_url_invalid_format():
"""Test parsing invalid git URL returns None"""
config = GiteaConfig()
url = "not-a-valid-url"
result = config._parse_git_url(url)
assert result is None
def test_find_project_directory_from_env(tmp_path, monkeypatch):
"""Test finding project directory from CLAUDE_PROJECT_DIR env var"""
project_dir = tmp_path / 'my-project'
project_dir.mkdir()
(project_dir / '.git').mkdir()
monkeypatch.setenv('CLAUDE_PROJECT_DIR', str(project_dir))
config = GiteaConfig()
result = config._find_project_directory()
assert result == project_dir
def test_find_project_directory_from_cwd(tmp_path, monkeypatch):
"""Test finding project directory from cwd with .env file"""
project_dir = tmp_path / 'project'
project_dir.mkdir()
(project_dir / '.env').write_text("GITEA_REPO=test/repo")
monkeypatch.chdir(project_dir)
# Clear env vars that might interfere
monkeypatch.delenv('CLAUDE_PROJECT_DIR', raising=False)
monkeypatch.delenv('PWD', raising=False)
config = GiteaConfig()
result = config._find_project_directory()
assert result == project_dir
def test_find_project_directory_none_when_no_markers(tmp_path, monkeypatch):
"""Test returns None when no project markers found"""
empty_dir = tmp_path / 'empty'
empty_dir.mkdir()
monkeypatch.chdir(empty_dir)
monkeypatch.delenv('CLAUDE_PROJECT_DIR', raising=False)
monkeypatch.delenv('PWD', raising=False)
monkeypatch.delenv('GITEA_REPO', raising=False)
config = GiteaConfig()
result = config._find_project_directory()
assert result is None

270
tests/test_gitea_client.py Normal file
View File

@@ -0,0 +1,270 @@
"""
Unit tests for Gitea API client.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from gitea_mcp.gitea_client import GiteaClient
@pytest.fixture
def mock_config():
"""Fixture providing mocked configuration"""
with patch('gitea_mcp.gitea_client.GiteaConfig') as mock_cfg:
mock_instance = mock_cfg.return_value
mock_instance.load.return_value = {
'api_url': 'https://test.com/api/v1',
'api_token': 'test_token',
'repo': 'test_owner/test_repo', # Combined owner/repo format
'mode': 'project'
}
yield mock_cfg
@pytest.fixture
def gitea_client(mock_config):
"""Fixture providing GiteaClient instance with mocked config"""
return GiteaClient()
def test_client_initialization(gitea_client):
"""Test client initializes with correct configuration"""
assert gitea_client.base_url == 'https://test.com/api/v1'
assert gitea_client.token == 'test_token'
assert gitea_client.repo == 'test_owner/test_repo' # Combined format
assert gitea_client.mode == 'project'
assert 'Authorization' in gitea_client.session.headers
assert gitea_client.session.headers['Authorization'] == 'token test_token'
def test_list_issues(gitea_client):
"""Test listing issues"""
mock_response = Mock()
mock_response.json.return_value = [
{'number': 1, 'title': 'Test Issue 1'},
{'number': 2, 'title': 'Test Issue 2'}
]
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'get', return_value=mock_response):
issues = gitea_client.list_issues(state='open')
assert len(issues) == 2
assert issues[0]['title'] == 'Test Issue 1'
gitea_client.session.get.assert_called_once()
def test_list_issues_with_labels(gitea_client):
"""Test listing issues with label filter"""
mock_response = Mock()
mock_response.json.return_value = [{'number': 1, 'title': 'Bug Issue'}]
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'get', return_value=mock_response):
issues = gitea_client.list_issues(state='open', labels=['Type/Bug'])
gitea_client.session.get.assert_called_once()
call_args = gitea_client.session.get.call_args
assert call_args[1]['params']['labels'] == 'Type/Bug'
def test_get_issue(gitea_client):
"""Test getting specific issue"""
mock_response = Mock()
mock_response.json.return_value = {'number': 1, 'title': 'Test Issue'}
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'get', return_value=mock_response):
issue = gitea_client.get_issue(1)
assert issue['number'] == 1
assert issue['title'] == 'Test Issue'
def test_create_issue(gitea_client):
"""Test creating new issue"""
mock_response = Mock()
mock_response.json.return_value = {
'number': 1,
'title': 'New Issue',
'body': 'Issue body'
}
mock_response.raise_for_status = Mock()
# Mock is_org_repo to avoid network call during label resolution
with patch.object(gitea_client, 'is_org_repo', return_value=True):
# Mock get_org_labels and get_labels for label resolution
with patch.object(gitea_client, 'get_org_labels', return_value=[{'name': 'Type/Bug', 'id': 1}]):
with patch.object(gitea_client, 'get_labels', return_value=[]):
with patch.object(gitea_client.session, 'post', return_value=mock_response):
issue = gitea_client.create_issue(
title='New Issue',
body='Issue body',
labels=['Type/Bug']
)
assert issue['title'] == 'New Issue'
gitea_client.session.post.assert_called_once()
def test_update_issue(gitea_client):
"""Test updating existing issue"""
mock_response = Mock()
mock_response.json.return_value = {
'number': 1,
'title': 'Updated Issue'
}
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'patch', return_value=mock_response):
issue = gitea_client.update_issue(
issue_number=1,
title='Updated Issue'
)
assert issue['title'] == 'Updated Issue'
gitea_client.session.patch.assert_called_once()
def test_add_comment(gitea_client):
"""Test adding comment to issue"""
mock_response = Mock()
mock_response.json.return_value = {'body': 'Test comment'}
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'post', return_value=mock_response):
comment = gitea_client.add_comment(1, 'Test comment')
assert comment['body'] == 'Test comment'
gitea_client.session.post.assert_called_once()
def test_get_labels(gitea_client):
"""Test getting repository labels"""
mock_response = Mock()
mock_response.json.return_value = [
{'name': 'Type/Bug'},
{'name': 'Priority/High'}
]
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'get', return_value=mock_response):
labels = gitea_client.get_labels()
assert len(labels) == 2
assert labels[0]['name'] == 'Type/Bug'
def test_get_org_labels(gitea_client):
"""Test getting organization labels"""
mock_response = Mock()
mock_response.json.return_value = [
{'name': 'Type/Bug'},
{'name': 'Type/Feature'}
]
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'get', return_value=mock_response):
labels = gitea_client.get_org_labels(org='test_owner')
assert len(labels) == 2
def test_list_repos(gitea_client):
"""Test listing organization repositories (PMO mode)"""
mock_response = Mock()
mock_response.json.return_value = [
{'name': 'repo1'},
{'name': 'repo2'}
]
mock_response.raise_for_status = Mock()
with patch.object(gitea_client.session, 'get', return_value=mock_response):
repos = gitea_client.list_repos(org='test_owner')
assert len(repos) == 2
assert repos[0]['name'] == 'repo1'
def test_aggregate_issues(gitea_client):
"""Test aggregating issues across repositories (PMO mode)"""
# Mock list_repos
gitea_client.list_repos = Mock(return_value=[
{'name': 'repo1'},
{'name': 'repo2'}
])
# Mock list_issues
gitea_client.list_issues = Mock(side_effect=[
[{'number': 1, 'title': 'Issue 1'}], # repo1
[{'number': 2, 'title': 'Issue 2'}] # repo2
])
aggregated = gitea_client.aggregate_issues(org='test_owner', state='open')
assert 'repo1' in aggregated
assert 'repo2' in aggregated
assert len(aggregated['repo1']) == 1
assert len(aggregated['repo2']) == 1
def test_no_repo_specified_error(gitea_client):
"""Test error when repository not specified or invalid format"""
# Create client without repo
with patch('gitea_mcp.gitea_client.GiteaConfig') as mock_cfg:
mock_instance = mock_cfg.return_value
mock_instance.load.return_value = {
'api_url': 'https://test.com/api/v1',
'api_token': 'test_token',
'repo': None, # No repo
'mode': 'company'
}
client = GiteaClient()
with pytest.raises(ValueError) as exc_info:
client.list_issues()
assert "Use 'owner/repo' format" in str(exc_info.value)
# ========================================
# ORGANIZATION DETECTION TESTS
# ========================================
def test_is_organization_true(gitea_client):
"""Test _is_organization returns True for valid organization"""
mock_response = Mock()
mock_response.status_code = 200
with patch.object(gitea_client.session, 'get', return_value=mock_response):
result = gitea_client._is_organization('personal-projects')
assert result is True
gitea_client.session.get.assert_called_once_with(
'https://test.com/api/v1/orgs/personal-projects'
)
def test_is_organization_false(gitea_client):
"""Test _is_organization returns False for user account"""
mock_response = Mock()
mock_response.status_code = 404
with patch.object(gitea_client.session, 'get', return_value=mock_response):
result = gitea_client._is_organization('lmiranda')
assert result is False
def test_is_org_repo_uses_orgs_endpoint(gitea_client):
"""Test is_org_repo uses /orgs endpoint instead of owner.type"""
mock_response = Mock()
mock_response.status_code = 200
with patch.object(gitea_client.session, 'get', return_value=mock_response):
result = gitea_client.is_org_repo('personal-projects/repo')
assert result is True
# Should call /orgs/personal-projects, not /repos/.../
gitea_client.session.get.assert_called_once_with(
'https://test.com/api/v1/orgs/personal-projects'
)

163
tests/test_issues.py Normal file
View File

@@ -0,0 +1,163 @@
"""
Unit tests for issue tools with branch detection.
"""
import pytest
from unittest.mock import Mock, patch, AsyncMock
from gitea_mcp.tools.issues import IssueTools
@pytest.fixture
def mock_gitea_client():
"""Fixture providing mocked Gitea client"""
client = Mock()
client.mode = 'project'
return client
@pytest.fixture
def issue_tools(mock_gitea_client):
"""Fixture providing IssueTools instance"""
return IssueTools(mock_gitea_client)
@pytest.mark.asyncio
async def test_list_issues_development_branch(issue_tools):
"""Test listing issues on development branch (allowed)"""
with patch.object(issue_tools, '_get_current_branch', return_value='feat/test-feature'):
issue_tools.gitea.list_issues = Mock(return_value=[{'number': 1}])
issues = await issue_tools.list_issues(state='open')
assert len(issues) == 1
issue_tools.gitea.list_issues.assert_called_once()
@pytest.mark.asyncio
async def test_create_issue_development_branch(issue_tools):
"""Test creating issue on development branch (allowed)"""
with patch.object(issue_tools, '_get_current_branch', return_value='development'):
issue_tools.gitea.create_issue = Mock(return_value={'number': 1})
issue = await issue_tools.create_issue('Test', 'Body')
assert issue['number'] == 1
issue_tools.gitea.create_issue.assert_called_once()
@pytest.mark.asyncio
async def test_create_issue_main_branch_blocked(issue_tools):
"""Test creating issue on main branch (blocked)"""
with patch.object(issue_tools, '_get_current_branch', return_value='main'):
with pytest.raises(PermissionError) as exc_info:
await issue_tools.create_issue('Test', 'Body')
assert "Cannot create issues on branch 'main'" in str(exc_info.value)
@pytest.mark.asyncio
async def test_create_issue_staging_branch_allowed(issue_tools):
"""Test creating issue on staging branch (allowed for documentation)"""
with patch.object(issue_tools, '_get_current_branch', return_value='staging'):
issue_tools.gitea.create_issue = Mock(return_value={'number': 1})
issue = await issue_tools.create_issue('Test', 'Body')
assert issue['number'] == 1
@pytest.mark.asyncio
async def test_update_issue_main_branch_blocked(issue_tools):
"""Test updating issue on main branch (blocked)"""
with patch.object(issue_tools, '_get_current_branch', return_value='main'):
with pytest.raises(PermissionError) as exc_info:
await issue_tools.update_issue(1, title='Updated')
assert "Cannot update issues on branch 'main'" in str(exc_info.value)
@pytest.mark.asyncio
async def test_list_issues_main_branch_allowed(issue_tools):
"""Test listing issues on main branch (allowed - read-only)"""
with patch.object(issue_tools, '_get_current_branch', return_value='main'):
issue_tools.gitea.list_issues = Mock(return_value=[{'number': 1}])
issues = await issue_tools.list_issues(state='open')
assert len(issues) == 1
@pytest.mark.asyncio
async def test_get_issue(issue_tools):
"""Test getting specific issue"""
with patch.object(issue_tools, '_get_current_branch', return_value='development'):
issue_tools.gitea.get_issue = Mock(return_value={'number': 1, 'title': 'Test'})
issue = await issue_tools.get_issue(1)
assert issue['number'] == 1
@pytest.mark.asyncio
async def test_add_comment(issue_tools):
"""Test adding comment to issue"""
with patch.object(issue_tools, '_get_current_branch', return_value='development'):
issue_tools.gitea.add_comment = Mock(return_value={'body': 'Test comment'})
comment = await issue_tools.add_comment(1, 'Test comment')
assert comment['body'] == 'Test comment'
@pytest.mark.asyncio
async def test_aggregate_issues_company_mode(issue_tools):
"""Test aggregating issues in company mode"""
issue_tools.gitea.mode = 'company'
with patch.object(issue_tools, '_get_current_branch', return_value='development'):
issue_tools.gitea.aggregate_issues = Mock(return_value={
'repo1': [{'number': 1}],
'repo2': [{'number': 2}]
})
aggregated = await issue_tools.aggregate_issues(org='test_owner')
assert 'repo1' in aggregated
assert 'repo2' in aggregated
@pytest.mark.asyncio
async def test_aggregate_issues_project_mode(issue_tools):
"""Test that aggregate_issues works in project mode with org argument"""
issue_tools.gitea.mode = 'project'
with patch.object(issue_tools, '_get_current_branch', return_value='development'):
issue_tools.gitea.aggregate_issues = Mock(return_value={
'repo1': [{'number': 1}]
})
# aggregate_issues now works in any mode when org is provided
aggregated = await issue_tools.aggregate_issues(org='test_owner')
assert 'repo1' in aggregated
def test_branch_detection():
"""Test branch detection logic"""
tools = IssueTools(Mock())
# Test development branches
with patch.object(tools, '_get_current_branch', return_value='development'):
assert tools._check_branch_permissions('create_issue') is True
with patch.object(tools, '_get_current_branch', return_value='feat/new-feature'):
assert tools._check_branch_permissions('create_issue') is True
# Test production branches
with patch.object(tools, '_get_current_branch', return_value='main'):
assert tools._check_branch_permissions('create_issue') is False
assert tools._check_branch_permissions('list_issues') is True
# Test staging branches
with patch.object(tools, '_get_current_branch', return_value='staging'):
assert tools._check_branch_permissions('create_issue') is True
assert tools._check_branch_permissions('update_issue') is False

478
tests/test_labels.py Normal file
View File

@@ -0,0 +1,478 @@
"""
Unit tests for label tools with suggestion logic.
"""
import pytest
from unittest.mock import Mock, patch
from gitea_mcp.tools.labels import LabelTools
@pytest.fixture
def mock_gitea_client():
"""Fixture providing mocked Gitea client"""
client = Mock()
client.repo = 'test_org/test_repo'
client.is_org_repo = Mock(return_value=True)
return client
@pytest.fixture
def label_tools(mock_gitea_client):
"""Fixture providing LabelTools instance"""
return LabelTools(mock_gitea_client)
@pytest.mark.asyncio
async def test_get_labels(label_tools):
"""Test getting all labels (org + repo)"""
label_tools.gitea.get_org_labels = Mock(return_value=[
{'name': 'Type/Bug'},
{'name': 'Type/Feature'}
])
label_tools.gitea.get_labels = Mock(return_value=[
{'name': 'Component/Backend'},
{'name': 'Component/Frontend'}
])
result = await label_tools.get_labels()
assert len(result['organization']) == 2
assert len(result['repository']) == 2
assert result['total_count'] == 4
# ========================================
# LABEL LOOKUP TESTS (NEW)
# ========================================
def test_build_label_lookup_slash_format():
"""Test building label lookup with slash format labels"""
mock_client = Mock()
mock_client.repo = 'test/repo'
tools = LabelTools(mock_client)
labels = ['Type/Bug', 'Type/Feature', 'Priority/High', 'Priority/Low']
lookup = tools._build_label_lookup(labels)
assert 'type' in lookup
assert 'bug' in lookup['type']
assert lookup['type']['bug'] == 'Type/Bug'
assert lookup['type']['feature'] == 'Type/Feature'
assert 'priority' in lookup
assert lookup['priority']['high'] == 'Priority/High'
def test_build_label_lookup_colon_space_format():
"""Test building label lookup with colon-space format labels"""
mock_client = Mock()
mock_client.repo = 'test/repo'
tools = LabelTools(mock_client)
labels = ['Type: Bug', 'Type: Feature', 'Priority: High', 'Effort: M']
lookup = tools._build_label_lookup(labels)
assert 'type' in lookup
assert 'bug' in lookup['type']
assert lookup['type']['bug'] == 'Type: Bug'
assert lookup['type']['feature'] == 'Type: Feature'
assert 'priority' in lookup
assert lookup['priority']['high'] == 'Priority: High'
# Test singular "Effort" (not "Efforts")
assert 'effort' in lookup
assert lookup['effort']['m'] == 'Effort: M'
def test_build_label_lookup_efforts_normalization():
"""Test that 'Efforts' is normalized to 'effort' for matching"""
mock_client = Mock()
mock_client.repo = 'test/repo'
tools = LabelTools(mock_client)
labels = ['Efforts/XS', 'Efforts/S', 'Efforts/M']
lookup = tools._build_label_lookup(labels)
# 'Efforts' should be normalized to 'effort'
assert 'effort' in lookup
assert lookup['effort']['xs'] == 'Efforts/XS'
def test_find_label():
"""Test finding labels from lookup"""
mock_client = Mock()
mock_client.repo = 'test/repo'
tools = LabelTools(mock_client)
lookup = {
'type': {'bug': 'Type: Bug', 'feature': 'Type: Feature'},
'priority': {'high': 'Priority: High', 'low': 'Priority: Low'}
}
assert tools._find_label(lookup, 'type', 'bug') == 'Type: Bug'
assert tools._find_label(lookup, 'priority', 'high') == 'Priority: High'
assert tools._find_label(lookup, 'type', 'nonexistent') is None
assert tools._find_label(lookup, 'nonexistent', 'bug') is None
# ========================================
# SUGGEST LABELS WITH DYNAMIC FORMAT TESTS
# ========================================
def _create_tools_with_labels(labels):
"""Helper to create LabelTools with mocked labels"""
import asyncio
mock_client = Mock()
mock_client.repo = 'test/repo'
mock_client.is_org_repo = Mock(return_value=False)
mock_client.get_labels = Mock(return_value=[{'name': l} for l in labels])
return LabelTools(mock_client)
@pytest.mark.asyncio
async def test_suggest_labels_with_slash_format():
"""Test label suggestion with slash format labels"""
labels = [
'Type/Bug', 'Type/Feature', 'Type/Refactor',
'Priority/Critical', 'Priority/High', 'Priority/Medium', 'Priority/Low',
'Complexity/Simple', 'Complexity/Medium', 'Complexity/Complex',
'Component/Auth'
]
tools = _create_tools_with_labels(labels)
context = "Fix critical bug in login authentication"
suggestions = await tools.suggest_labels(context)
assert 'Type/Bug' in suggestions
assert 'Priority/Critical' in suggestions
assert 'Component/Auth' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_with_colon_space_format():
"""Test label suggestion with colon-space format labels"""
labels = [
'Type: Bug', 'Type: Feature', 'Type: Refactor',
'Priority: Critical', 'Priority: High', 'Priority: Medium', 'Priority: Low',
'Complexity: Simple', 'Complexity: Medium', 'Complexity: Complex',
'Effort: XS', 'Effort: S', 'Effort: M', 'Effort: L', 'Effort: XL'
]
tools = _create_tools_with_labels(labels)
context = "Fix critical bug for tiny 1 hour fix"
suggestions = await tools.suggest_labels(context)
# Should return colon-space format labels
assert 'Type: Bug' in suggestions
assert 'Priority: Critical' in suggestions
assert 'Effort: XS' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_bug():
"""Test label suggestion for bug context"""
labels = [
'Type/Bug', 'Type/Feature',
'Priority/Critical', 'Priority/High', 'Priority/Medium', 'Priority/Low',
'Complexity/Simple', 'Complexity/Medium', 'Complexity/Complex',
'Component/Auth'
]
tools = _create_tools_with_labels(labels)
context = "Fix critical bug in login authentication"
suggestions = await tools.suggest_labels(context)
assert 'Type/Bug' in suggestions
assert 'Priority/Critical' in suggestions
assert 'Component/Auth' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_feature():
"""Test label suggestion for feature context"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium']
tools = _create_tools_with_labels(labels)
context = "Add new feature to implement user dashboard"
suggestions = await tools.suggest_labels(context)
assert 'Type/Feature' in suggestions
assert any('Priority' in label for label in suggestions)
@pytest.mark.asyncio
async def test_suggest_labels_refactor():
"""Test label suggestion for refactor context"""
labels = ['Type/Refactor', 'Priority/Medium', 'Complexity/Medium', 'Component/Backend']
tools = _create_tools_with_labels(labels)
context = "Refactor architecture to extract service layer"
suggestions = await tools.suggest_labels(context)
assert 'Type/Refactor' in suggestions
assert 'Component/Backend' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_documentation():
"""Test label suggestion for documentation context"""
labels = ['Type/Documentation', 'Priority/Medium', 'Complexity/Medium', 'Component/API', 'Component/Docs']
tools = _create_tools_with_labels(labels)
context = "Update documentation for API endpoints"
suggestions = await tools.suggest_labels(context)
assert 'Type/Documentation' in suggestions
assert 'Component/API' in suggestions or 'Component/Docs' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_priority():
"""Test priority detection in suggestions"""
labels = ['Type/Feature', 'Priority/Critical', 'Priority/High', 'Priority/Medium', 'Priority/Low', 'Complexity/Medium']
tools = _create_tools_with_labels(labels)
# Critical priority
context = "Urgent blocker in production"
suggestions = await tools.suggest_labels(context)
assert 'Priority/Critical' in suggestions
# High priority
context = "Important feature needed asap"
suggestions = await tools.suggest_labels(context)
assert 'Priority/High' in suggestions
# Low priority
context = "Nice-to-have optional improvement"
suggestions = await tools.suggest_labels(context)
assert 'Priority/Low' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_complexity():
"""Test complexity detection in suggestions"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Simple', 'Complexity/Medium', 'Complexity/Complex']
tools = _create_tools_with_labels(labels)
# Simple complexity
context = "Simple quick fix for typo"
suggestions = await tools.suggest_labels(context)
assert 'Complexity/Simple' in suggestions
# Complex complexity
context = "Complex challenging architecture redesign"
suggestions = await tools.suggest_labels(context)
assert 'Complexity/Complex' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_efforts():
"""Test efforts detection in suggestions"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Efforts/XS', 'Efforts/S', 'Efforts/M', 'Efforts/L', 'Efforts/XL']
tools = _create_tools_with_labels(labels)
# XS effort
context = "Tiny fix that takes 1 hour"
suggestions = await tools.suggest_labels(context)
assert 'Efforts/XS' in suggestions
# L effort
context = "Large feature taking 1 week"
suggestions = await tools.suggest_labels(context)
assert 'Efforts/L' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_components():
"""Test component detection in suggestions"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Component/Backend', 'Component/Frontend', 'Component/API', 'Component/Database']
tools = _create_tools_with_labels(labels)
# Backend component
context = "Update backend API service"
suggestions = await tools.suggest_labels(context)
assert 'Component/Backend' in suggestions
assert 'Component/API' in suggestions
# Frontend component
context = "Fix frontend UI component"
suggestions = await tools.suggest_labels(context)
assert 'Component/Frontend' in suggestions
# Database component
context = "Add database migration for schema"
suggestions = await tools.suggest_labels(context)
assert 'Component/Database' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_tech_stack():
"""Test tech stack detection in suggestions"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Tech/Python', 'Tech/FastAPI', 'Tech/Docker', 'Tech/PostgreSQL']
tools = _create_tools_with_labels(labels)
# Python
context = "Update Python FastAPI endpoint"
suggestions = await tools.suggest_labels(context)
assert 'Tech/Python' in suggestions
assert 'Tech/FastAPI' in suggestions
# Docker
context = "Fix Dockerfile configuration"
suggestions = await tools.suggest_labels(context)
assert 'Tech/Docker' in suggestions
# PostgreSQL
context = "Optimize PostgreSQL query"
suggestions = await tools.suggest_labels(context)
assert 'Tech/PostgreSQL' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_source():
"""Test source detection in suggestions"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Source/Development', 'Source/Staging', 'Source/Production']
tools = _create_tools_with_labels(labels)
# Development
context = "Issue found in development environment"
suggestions = await tools.suggest_labels(context)
assert 'Source/Development' in suggestions
# Production
context = "Critical production issue"
suggestions = await tools.suggest_labels(context)
assert 'Source/Production' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_risk():
"""Test risk detection in suggestions"""
labels = ['Type/Feature', 'Priority/Medium', 'Complexity/Medium', 'Risk/High', 'Risk/Low']
tools = _create_tools_with_labels(labels)
# High risk
context = "Breaking change to major API"
suggestions = await tools.suggest_labels(context)
assert 'Risk/High' in suggestions
# Low risk
context = "Safe minor update with low risk"
suggestions = await tools.suggest_labels(context)
assert 'Risk/Low' in suggestions
@pytest.mark.asyncio
async def test_suggest_labels_multiple_categories():
"""Test that suggestions span multiple categories"""
labels = [
'Type/Bug', 'Type/Feature',
'Priority/Critical', 'Priority/Medium',
'Complexity/Complex', 'Complexity/Medium',
'Component/Backend', 'Component/API', 'Component/Auth',
'Tech/FastAPI', 'Tech/PostgreSQL',
'Source/Production'
]
tools = _create_tools_with_labels(labels)
context = """
Urgent critical bug in production backend API service.
Need to fix broken authentication endpoint.
This is a complex issue requiring FastAPI and PostgreSQL expertise.
"""
suggestions = await tools.suggest_labels(context)
# Should have Type
assert any('Type/' in label for label in suggestions)
# Should have Priority
assert any('Priority/' in label for label in suggestions)
# Should have Component
assert any('Component/' in label for label in suggestions)
# Should have Tech
assert any('Tech/' in label for label in suggestions)
# Should have Source
assert any('Source/' in label for label in suggestions)
@pytest.mark.asyncio
async def test_suggest_labels_empty_repo():
"""Test suggestions when no repo specified and no labels available"""
mock_client = Mock()
mock_client.repo = None
tools = LabelTools(mock_client)
context = "Fix a bug"
suggestions = await tools.suggest_labels(context)
# Should return empty list when no repo
assert suggestions == []
@pytest.mark.asyncio
async def test_suggest_labels_no_matching_labels():
"""Test suggestions return empty when no matching labels exist"""
labels = ['Custom/Label', 'Other/Thing'] # No standard labels
tools = _create_tools_with_labels(labels)
context = "Fix a bug"
suggestions = await tools.suggest_labels(context)
# Should return empty list since no Type/Bug or similar exists
assert len(suggestions) == 0
@pytest.mark.asyncio
async def test_get_labels_org_owned_repo():
"""Test getting labels for organization-owned repository"""
mock_client = Mock()
mock_client.repo = 'myorg/myrepo'
mock_client.is_org_repo = Mock(return_value=True)
mock_client.get_org_labels = Mock(return_value=[
{'name': 'Type/Bug', 'id': 1},
{'name': 'Type/Feature', 'id': 2}
])
mock_client.get_labels = Mock(return_value=[
{'name': 'Component/Backend', 'id': 3}
])
tools = LabelTools(mock_client)
result = await tools.get_labels()
# Should fetch both org and repo labels
mock_client.is_org_repo.assert_called_once_with('myorg/myrepo')
mock_client.get_org_labels.assert_called_once_with('myorg')
mock_client.get_labels.assert_called_once_with('myorg/myrepo')
assert len(result['organization']) == 2
assert len(result['repository']) == 1
assert result['total_count'] == 3
@pytest.mark.asyncio
async def test_get_labels_user_owned_repo():
"""Test getting labels for user-owned repository (no org labels)"""
mock_client = Mock()
mock_client.repo = 'lmiranda/personal-portfolio'
mock_client.is_org_repo = Mock(return_value=False)
mock_client.get_labels = Mock(return_value=[
{'name': 'bug', 'id': 1},
{'name': 'enhancement', 'id': 2}
])
tools = LabelTools(mock_client)
result = await tools.get_labels()
# Should check if org repo
mock_client.is_org_repo.assert_called_once_with('lmiranda/personal-portfolio')
# Should NOT call get_org_labels for user-owned repos
mock_client.get_org_labels.assert_not_called()
# Should still get repo labels
mock_client.get_labels.assert_called_once_with('lmiranda/personal-portfolio')
assert len(result['organization']) == 0
assert len(result['repository']) == 2
assert result['total_count'] == 2