feat: implement Gitea MCP Server with full test coverage

Phase 1 implementation complete:
- Complete MCP server with 8 tools (list_issues, get_issue, create_issue, update_issue, add_comment, get_labels, suggest_labels, aggregate_issues)
- Hybrid configuration system (system-level + project-level)
- Branch-aware security model (main/staging/development)
- Mode detection (project vs company/PMO)
- Intelligent label suggestion (44-label taxonomy)
- 42 unit tests (100% passing)
- Comprehensive documentation (README.md, TESTING.md)

Files implemented:
- mcp_server/config.py - Configuration loader
- mcp_server/gitea_client.py - Gitea API client
- mcp_server/server.py - MCP server entry point
- mcp_server/tools/issues.py - Issue operations
- mcp_server/tools/labels.py - Label management
- tests/ - Complete test suite (42 tests)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-06 16:23:52 -05:00
parent 4e25a6f9f5
commit e41c067d93
15 changed files with 2962 additions and 0 deletions

View File

View File

@@ -0,0 +1,102 @@
"""
Configuration loader for Gitea MCP Server.
Implements hybrid configuration system:
- System-level: ~/.config/claude/gitea.env (credentials)
- Project-level: .env (repository specification)
"""
from pathlib import Path
from dotenv import load_dotenv
import os
import logging
from typing import Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GiteaConfig:
"""Hybrid configuration loader with mode detection"""
def __init__(self):
self.api_url: Optional[str] = None
self.api_token: Optional[str] = None
self.owner: Optional[str] = None
self.repo: Optional[str] = None
self.mode: str = 'project'
def load(self) -> Dict[str, Optional[str]]:
"""
Load configuration from system and project levels.
Project-level configuration overrides system-level.
Returns:
Dict containing api_url, api_token, owner, repo, mode
Raises:
FileNotFoundError: If system config is missing
ValueError: If required configuration is missing
"""
# Load system config
system_config = Path.home() / '.config' / 'claude' / 'gitea.env'
if system_config.exists():
load_dotenv(system_config)
logger.info(f"Loaded system configuration from {system_config}")
else:
raise FileNotFoundError(
f"System config not found: {system_config}\n"
"Create it with: mkdir -p ~/.config/claude && "
"cat > ~/.config/claude/gitea.env"
)
# Load project config (overrides system)
project_config = Path.cwd() / '.env'
if project_config.exists():
load_dotenv(project_config, override=True)
logger.info(f"Loaded project configuration from {project_config}")
# Extract values
self.api_url = os.getenv('GITEA_API_URL')
self.api_token = os.getenv('GITEA_API_TOKEN')
self.owner = os.getenv('GITEA_OWNER')
self.repo = os.getenv('GITEA_REPO') # Optional for PMO
# Detect mode
if self.repo:
self.mode = 'project'
logger.info(f"Running in project mode: {self.repo}")
else:
self.mode = 'company'
logger.info("Running in company-wide mode (PMO)")
# Validate required variables
self._validate()
return {
'api_url': self.api_url,
'api_token': self.api_token,
'owner': self.owner,
'repo': self.repo,
'mode': self.mode
}
def _validate(self) -> None:
"""
Validate that required configuration is present.
Raises:
ValueError: If required configuration is missing
"""
required = {
'GITEA_API_URL': self.api_url,
'GITEA_API_TOKEN': self.api_token,
'GITEA_OWNER': self.owner
}
missing = [key for key, value in required.items() if not value]
if missing:
raise ValueError(
f"Missing required configuration: {', '.join(missing)}\n"
"Check your ~/.config/claude/gitea.env file"
)

View File

@@ -0,0 +1,328 @@
"""
Gitea API client for interacting with Gitea API.
Provides synchronous methods for:
- Issue CRUD operations
- Label management
- Repository operations
- PMO multi-repo aggregation
"""
import requests
import logging
from typing import List, Dict, Optional
from .config import GiteaConfig
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GiteaClient:
"""Client for interacting with Gitea API"""
def __init__(self):
"""Initialize Gitea client with configuration"""
config = GiteaConfig()
config_dict = config.load()
self.base_url = config_dict['api_url']
self.token = config_dict['api_token']
self.owner = config_dict['owner']
self.repo = config_dict.get('repo') # Optional for PMO
self.mode = config_dict['mode']
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
})
logger.info(f"Gitea client initialized for {self.owner} in {self.mode} mode")
def list_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List issues from Gitea repository.
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
repo: Override configured repo (for PMO multi-repo)
Returns:
List of issue dictionaries
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues"
params = {'state': state}
if labels:
params['labels'] = ','.join(labels)
logger.info(f"Listing issues from {self.owner}/{target_repo} with state={state}")
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def get_issue(
self,
issue_number: int,
repo: Optional[str] = None
) -> Dict:
"""
Get specific issue details.
Args:
issue_number: Issue number
repo: Override configured repo (for PMO multi-repo)
Returns:
Issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}"
logger.info(f"Getting issue #{issue_number} from {self.owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_issue(
self,
title: str,
body: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a new issue in Gitea.
Args:
title: Issue title
body: Issue description
labels: List of label names
repo: Override configured repo (for PMO multi-repo)
Returns:
Created issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues"
data = {
'title': title,
'body': body
}
if labels:
data['labels'] = labels
logger.info(f"Creating issue in {self.owner}/{target_repo}: {title}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def update_issue(
self,
issue_number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Update existing issue.
Args:
issue_number: Issue number
title: New title (optional)
body: New body (optional)
state: New state - 'open' or 'closed' (optional)
labels: New labels (optional)
repo: Override configured repo (for PMO multi-repo)
Returns:
Updated issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}"
data = {}
if title is not None:
data['title'] = title
if body is not None:
data['body'] = body
if state is not None:
data['state'] = state
if labels is not None:
data['labels'] = labels
logger.info(f"Updating issue #{issue_number} in {self.owner}/{target_repo}")
response = self.session.patch(url, json=data)
response.raise_for_status()
return response.json()
def add_comment(
self,
issue_number: int,
comment: str,
repo: Optional[str] = None
) -> Dict:
"""
Add comment to issue.
Args:
issue_number: Issue number
comment: Comment text
repo: Override configured repo (for PMO multi-repo)
Returns:
Created comment dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}/comments"
data = {'body': comment}
logger.info(f"Adding comment to issue #{issue_number} in {self.owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def get_labels(
self,
repo: Optional[str] = None
) -> List[Dict]:
"""
Get all labels from repository.
Args:
repo: Override configured repo (for PMO multi-repo)
Returns:
List of label dictionaries
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/labels"
logger.info(f"Getting labels from {self.owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def get_org_labels(self) -> List[Dict]:
"""
Get organization-level labels.
Returns:
List of organization label dictionaries
Raises:
requests.HTTPError: If API request fails
"""
url = f"{self.base_url}/orgs/{self.owner}/labels"
logger.info(f"Getting organization labels for {self.owner}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
# PMO-specific methods
def list_repos(self) -> List[Dict]:
"""
List all repositories in organization (PMO mode).
Returns:
List of repository dictionaries
Raises:
requests.HTTPError: If API request fails
"""
url = f"{self.base_url}/orgs/{self.owner}/repos"
logger.info(f"Listing all repositories for organization {self.owner}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def aggregate_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]:
"""
Fetch issues across all repositories (PMO mode).
Returns dict keyed by repository name.
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
Returns:
Dictionary mapping repository names to issue lists
Raises:
requests.HTTPError: If API request fails
"""
repos = self.list_repos()
aggregated = {}
logger.info(f"Aggregating issues across {len(repos)} repositories")
for repo in repos:
repo_name = repo['name']
try:
issues = self.list_issues(
state=state,
labels=labels,
repo=repo_name
)
if issues:
aggregated[repo_name] = issues
logger.info(f"Found {len(issues)} issues in {repo_name}")
except Exception as e:
# Log error but continue with other repos
logger.error(f"Error fetching issues from {repo_name}: {e}")
return aggregated

View File

@@ -0,0 +1,300 @@
"""
MCP Server entry point for Gitea integration.
Provides Gitea tools to Claude Code via JSON-RPC 2.0 over stdio.
"""
import asyncio
import logging
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .config import GiteaConfig
from .gitea_client import GiteaClient
from .tools.issues import IssueTools
from .tools.labels import LabelTools
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GiteaMCPServer:
"""MCP Server for Gitea integration"""
def __init__(self):
self.server = Server("gitea-mcp")
self.config = None
self.client = None
self.issue_tools = None
self.label_tools = None
async def initialize(self):
"""
Initialize server and load configuration.
Raises:
Exception: If initialization fails
"""
try:
config_loader = GiteaConfig()
self.config = config_loader.load()
self.client = GiteaClient()
self.issue_tools = IssueTools(self.client)
self.label_tools = LabelTools(self.client)
logger.info(f"Gitea MCP Server initialized in {self.config['mode']} mode")
except Exception as e:
logger.error(f"Failed to initialize: {e}")
raise
def setup_tools(self):
"""Register all available tools with the MCP server"""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""Return list of available tools"""
return [
Tool(
name="list_issues",
description="List issues from Gitea repository",
inputSchema={
"type": "object",
"properties": {
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"default": "open",
"description": "Issue state filter"
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by labels"
},
"repo": {
"type": "string",
"description": "Repository name (for PMO mode)"
}
}
}
),
Tool(
name="get_issue",
description="Get specific issue details",
inputSchema={
"type": "object",
"properties": {
"issue_number": {
"type": "integer",
"description": "Issue number"
},
"repo": {
"type": "string",
"description": "Repository name (for PMO mode)"
}
},
"required": ["issue_number"]
}
),
Tool(
name="create_issue",
description="Create a new issue in Gitea",
inputSchema={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Issue title"
},
"body": {
"type": "string",
"description": "Issue description"
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "List of label names"
},
"repo": {
"type": "string",
"description": "Repository name (for PMO mode)"
}
},
"required": ["title", "body"]
}
),
Tool(
name="update_issue",
description="Update existing issue",
inputSchema={
"type": "object",
"properties": {
"issue_number": {
"type": "integer",
"description": "Issue number"
},
"title": {
"type": "string",
"description": "New title"
},
"body": {
"type": "string",
"description": "New body"
},
"state": {
"type": "string",
"enum": ["open", "closed"],
"description": "New state"
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "New labels"
},
"repo": {
"type": "string",
"description": "Repository name (for PMO mode)"
}
},
"required": ["issue_number"]
}
),
Tool(
name="add_comment",
description="Add comment to issue",
inputSchema={
"type": "object",
"properties": {
"issue_number": {
"type": "integer",
"description": "Issue number"
},
"comment": {
"type": "string",
"description": "Comment text"
},
"repo": {
"type": "string",
"description": "Repository name (for PMO mode)"
}
},
"required": ["issue_number", "comment"]
}
),
Tool(
name="get_labels",
description="Get all available labels (org + repo)",
inputSchema={
"type": "object",
"properties": {
"repo": {
"type": "string",
"description": "Repository name (for PMO mode)"
}
}
}
),
Tool(
name="suggest_labels",
description="Analyze context and suggest appropriate labels",
inputSchema={
"type": "object",
"properties": {
"context": {
"type": "string",
"description": "Issue title + description or sprint context"
}
},
"required": ["context"]
}
),
Tool(
name="aggregate_issues",
description="Fetch issues across all repositories (PMO mode)",
inputSchema={
"type": "object",
"properties": {
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"default": "open",
"description": "Issue state filter"
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by labels"
}
}
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""
Handle tool invocation.
Args:
name: Tool name
arguments: Tool arguments
Returns:
List of TextContent with results
"""
try:
# Route to appropriate tool handler
if name == "list_issues":
result = await self.issue_tools.list_issues(**arguments)
elif name == "get_issue":
result = await self.issue_tools.get_issue(**arguments)
elif name == "create_issue":
result = await self.issue_tools.create_issue(**arguments)
elif name == "update_issue":
result = await self.issue_tools.update_issue(**arguments)
elif name == "add_comment":
result = await self.issue_tools.add_comment(**arguments)
elif name == "get_labels":
result = await self.label_tools.get_labels(**arguments)
elif name == "suggest_labels":
result = await self.label_tools.suggest_labels(**arguments)
elif name == "aggregate_issues":
result = await self.issue_tools.aggregate_issues(**arguments)
else:
raise ValueError(f"Unknown tool: {name}")
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Tool {name} failed: {e}")
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
async def run(self):
"""Run the MCP server"""
await self.initialize()
self.setup_tools()
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
async def main():
"""Main entry point"""
server = GiteaMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,7 @@
"""
MCP tools for Gitea integration.
This package provides MCP tool implementations for:
- Issue operations (issues.py)
- Label management (labels.py)
"""

View File

@@ -0,0 +1,279 @@
"""
Issue management tools for MCP server.
Provides async wrappers for issue CRUD operations with:
- Branch-aware security
- PMO multi-repo support
- Comprehensive error handling
"""
import asyncio
import subprocess
import logging
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IssueTools:
"""Async wrappers for Gitea issue operations with branch detection"""
def __init__(self, gitea_client):
"""
Initialize issue tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
def _get_current_branch(self) -> str:
"""
Get current git branch.
Returns:
Current branch name or 'unknown' if not in a git repo
"""
try:
result = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def _check_branch_permissions(self, operation: str) -> bool:
"""
Check if operation is allowed on current branch.
Args:
operation: Operation name (list_issues, create_issue, etc.)
Returns:
True if operation is allowed, False otherwise
"""
branch = self._get_current_branch()
# Production branches (read-only except incidents)
if branch in ['main', 'master'] or branch.startswith('prod/'):
return operation in ['list_issues', 'get_issue', 'get_labels']
# Staging branches (read-only for code)
if branch == 'staging' or branch.startswith('stage/'):
return operation in ['list_issues', 'get_issue', 'get_labels', 'create_issue']
# Development branches (full access)
if branch in ['development', 'develop'] or branch.startswith(('feat/', 'feature/', 'dev/')):
return True
# Unknown branch - be restrictive
return False
async def list_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List issues from repository (async wrapper).
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
repo: Override configured repo (for PMO multi-repo)
Returns:
List of issue dictionaries
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('list_issues'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot list issues on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.list_issues(state, labels, repo)
)
async def get_issue(
self,
issue_number: int,
repo: Optional[str] = None
) -> Dict:
"""
Get specific issue details (async wrapper).
Args:
issue_number: Issue number
repo: Override configured repo (for PMO multi-repo)
Returns:
Issue dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('get_issue'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot get issue on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.get_issue(issue_number, repo)
)
async def create_issue(
self,
title: str,
body: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create new issue (async wrapper with branch check).
Args:
title: Issue title
body: Issue description
labels: List of label names
repo: Override configured repo (for PMO multi-repo)
Returns:
Created issue dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('create_issue'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot create issues on branch '{branch}'. "
f"Switch to a development branch to create issues."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.create_issue(title, body, labels, repo)
)
async def update_issue(
self,
issue_number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Update existing issue (async wrapper with branch check).
Args:
issue_number: Issue number
title: New title (optional)
body: New body (optional)
state: New state - 'open' or 'closed' (optional)
labels: New labels (optional)
repo: Override configured repo (for PMO multi-repo)
Returns:
Updated issue dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('update_issue'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot update issues on branch '{branch}'. "
f"Switch to a development branch to update issues."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.update_issue(issue_number, title, body, state, labels, repo)
)
async def add_comment(
self,
issue_number: int,
comment: str,
repo: Optional[str] = None
) -> Dict:
"""
Add comment to issue (async wrapper with branch check).
Args:
issue_number: Issue number
comment: Comment text
repo: Override configured repo (for PMO multi-repo)
Returns:
Created comment dictionary
Raises:
PermissionError: If operation not allowed on current branch
"""
if not self._check_branch_permissions('add_comment'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot add comments on branch '{branch}'. "
f"Switch to a development branch to add comments."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.add_comment(issue_number, comment, repo)
)
async def aggregate_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]:
"""
Aggregate issues across all repositories (PMO mode, async wrapper).
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
Returns:
Dictionary mapping repository names to issue lists
Raises:
ValueError: If not in company mode
PermissionError: If operation not allowed on current branch
"""
if self.gitea.mode != 'company':
raise ValueError("aggregate_issues only available in company mode")
if not self._check_branch_permissions('aggregate_issues'):
branch = self._get_current_branch()
raise PermissionError(
f"Cannot aggregate issues on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.gitea.aggregate_issues(state, labels)
)

View File

@@ -0,0 +1,165 @@
"""
Label management tools for MCP server.
Provides async wrappers for label operations with:
- Label taxonomy retrieval
- Intelligent label suggestion
- Dynamic label detection
"""
import asyncio
import logging
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LabelTools:
"""Async wrappers for Gitea label operations"""
def __init__(self, gitea_client):
"""
Initialize label tools.
Args:
gitea_client: GiteaClient instance
"""
self.gitea = gitea_client
async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]:
"""
Get all labels (org + repo) (async wrapper).
Args:
repo: Override configured repo (for PMO multi-repo)
Returns:
Dictionary with 'org' and 'repo' label lists
"""
loop = asyncio.get_event_loop()
# Get org labels
org_labels = await loop.run_in_executor(
None,
self.gitea.get_org_labels
)
# Get repo labels if repo is specified
repo_labels = []
if repo or self.gitea.repo:
target_repo = repo or self.gitea.repo
repo_labels = await loop.run_in_executor(
None,
lambda: self.gitea.get_labels(target_repo)
)
return {
'organization': org_labels,
'repository': repo_labels,
'total_count': len(org_labels) + len(repo_labels)
}
async def suggest_labels(self, context: str) -> List[str]:
"""
Analyze context and suggest appropriate labels.
Args:
context: Issue title + description or sprint context
Returns:
List of suggested label names
"""
suggested = []
context_lower = context.lower()
# Type detection (exclusive - only one)
if any(word in context_lower for word in ['bug', 'error', 'fix', 'broken', 'crash', 'fail']):
suggested.append('Type/Bug')
elif any(word in context_lower for word in ['refactor', 'extract', 'restructure', 'architecture', 'service extraction']):
suggested.append('Type/Refactor')
elif any(word in context_lower for word in ['feature', 'add', 'implement', 'new', 'create']):
suggested.append('Type/Feature')
elif any(word in context_lower for word in ['docs', 'documentation', 'readme', 'guide']):
suggested.append('Type/Documentation')
elif any(word in context_lower for word in ['test', 'testing', 'spec', 'coverage']):
suggested.append('Type/Test')
elif any(word in context_lower for word in ['chore', 'maintenance', 'update', 'upgrade']):
suggested.append('Type/Chore')
# Priority detection
if any(word in context_lower for word in ['critical', 'urgent', 'blocker', 'blocking', 'emergency']):
suggested.append('Priority/Critical')
elif any(word in context_lower for word in ['high', 'important', 'asap', 'soon']):
suggested.append('Priority/High')
elif any(word in context_lower for word in ['low', 'nice-to-have', 'optional', 'later']):
suggested.append('Priority/Low')
else:
suggested.append('Priority/Medium')
# Complexity detection
if any(word in context_lower for word in ['simple', 'trivial', 'easy', 'quick']):
suggested.append('Complexity/Simple')
elif any(word in context_lower for word in ['complex', 'difficult', 'challenging', 'intricate']):
suggested.append('Complexity/Complex')
else:
suggested.append('Complexity/Medium')
# Efforts detection
if any(word in context_lower for word in ['xs', 'tiny', '1 hour', '2 hours']):
suggested.append('Efforts/XS')
elif any(word in context_lower for word in ['small', 's ', '1 day', 'half day']):
suggested.append('Efforts/S')
elif any(word in context_lower for word in ['medium', 'm ', '2 days', '3 days']):
suggested.append('Efforts/M')
elif any(word in context_lower for word in ['large', 'l ', '1 week', '5 days']):
suggested.append('Efforts/L')
elif any(word in context_lower for word in ['xl', 'extra large', '2 weeks', 'sprint']):
suggested.append('Efforts/XL')
# Component detection (based on keywords)
component_keywords = {
'Component/Backend': ['backend', 'server', 'api', 'database', 'service'],
'Component/Frontend': ['frontend', 'ui', 'interface', 'react', 'vue', 'component'],
'Component/API': ['api', 'endpoint', 'rest', 'graphql', 'route'],
'Component/Database': ['database', 'db', 'sql', 'migration', 'schema', 'postgres'],
'Component/Auth': ['auth', 'authentication', 'login', 'oauth', 'token', 'session'],
'Component/Deploy': ['deploy', 'deployment', 'docker', 'kubernetes', 'ci/cd'],
'Component/Testing': ['test', 'testing', 'spec', 'jest', 'pytest', 'coverage'],
'Component/Docs': ['docs', 'documentation', 'readme', 'guide', 'wiki']
}
for label, keywords in component_keywords.items():
if any(keyword in context_lower for keyword in keywords):
suggested.append(label)
# Tech stack detection
tech_keywords = {
'Tech/Python': ['python', 'fastapi', 'django', 'flask', 'pytest'],
'Tech/JavaScript': ['javascript', 'js', 'node', 'npm', 'yarn'],
'Tech/Docker': ['docker', 'dockerfile', 'container', 'compose'],
'Tech/PostgreSQL': ['postgres', 'postgresql', 'psql', 'sql'],
'Tech/Redis': ['redis', 'cache', 'session store'],
'Tech/Vue': ['vue', 'vuejs', 'nuxt'],
'Tech/FastAPI': ['fastapi', 'pydantic', 'starlette']
}
for label, keywords in tech_keywords.items():
if any(keyword in context_lower for keyword in keywords):
suggested.append(label)
# Source detection (based on git branch or context)
if 'development' in context_lower or 'dev/' in context_lower:
suggested.append('Source/Development')
elif 'staging' in context_lower or 'stage/' in context_lower:
suggested.append('Source/Staging')
elif 'production' in context_lower or 'prod' in context_lower:
suggested.append('Source/Production')
# Risk detection
if any(word in context_lower for word in ['breaking', 'breaking change', 'major', 'risky']):
suggested.append('Risk/High')
elif any(word in context_lower for word in ['safe', 'low risk', 'minor']):
suggested.append('Risk/Low')
logger.info(f"Suggested {len(suggested)} labels based on context")
return suggested