The suggest_labels function now dynamically detects the label naming convention used in the repository (slash format like Type/Bug or colon-space format like Type: Bug) instead of hardcoding slash format. Changes: - Added _build_label_lookup() to parse and normalize label formats - Added _find_label() to find actual labels from the lookup - Updated suggest_labels() to accept optional repo parameter - Labels are fetched first, then suggestions match actual names - Supports Efforts/Effort normalization (handles singular/plural) Fixes issue #73 sub-issue 3: label format mismatch Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
262 lines
11 KiB
Python
262 lines
11 KiB
Python
"""
|
|
Label management tools for MCP server.
|
|
|
|
Provides async wrappers for label operations with:
|
|
- Label taxonomy retrieval
|
|
- Intelligent label suggestion
|
|
- Dynamic label detection
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from typing import List, Dict, Optional
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LabelTools:
|
|
"""Async wrappers for Gitea label operations"""
|
|
|
|
def __init__(self, gitea_client):
|
|
"""
|
|
Initialize label tools.
|
|
|
|
Args:
|
|
gitea_client: GiteaClient instance
|
|
"""
|
|
self.gitea = gitea_client
|
|
|
|
async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]:
|
|
"""Get all labels (org + repo if org-owned, repo-only if user-owned)."""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
target_repo = repo or self.gitea.repo
|
|
if not target_repo or '/' not in target_repo:
|
|
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
|
|
|
|
# Check if repo belongs to an organization or user
|
|
is_org = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.is_org_repo(target_repo)
|
|
)
|
|
|
|
org_labels = []
|
|
if is_org:
|
|
org = target_repo.split('/')[0]
|
|
org_labels = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.get_org_labels(org)
|
|
)
|
|
|
|
repo_labels = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.get_labels(target_repo)
|
|
)
|
|
|
|
return {
|
|
'organization': org_labels,
|
|
'repository': repo_labels,
|
|
'total_count': len(org_labels) + len(repo_labels)
|
|
}
|
|
|
|
async def suggest_labels(self, context: str, repo: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Analyze context and suggest appropriate labels from repository's actual labels.
|
|
|
|
This method fetches actual labels from the repository and matches them
|
|
dynamically, supporting any label naming convention (slash, colon-space, etc.).
|
|
|
|
Args:
|
|
context: Issue title + description or sprint context
|
|
repo: Repository in 'owner/repo' format (optional, uses default if not provided)
|
|
|
|
Returns:
|
|
List of suggested label names that exist in the repository
|
|
"""
|
|
# Fetch actual labels from repository
|
|
target_repo = repo or self.gitea.repo
|
|
if not target_repo:
|
|
logger.warning("No repository specified, returning empty suggestions")
|
|
return []
|
|
|
|
try:
|
|
labels_data = await self.get_labels(target_repo)
|
|
all_labels = labels_data.get('organization', []) + labels_data.get('repository', [])
|
|
label_names = [label['name'] for label in all_labels]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch labels: {e}. Using fallback suggestions.")
|
|
label_names = []
|
|
|
|
# Build label lookup for dynamic matching
|
|
label_lookup = self._build_label_lookup(label_names)
|
|
|
|
suggested = []
|
|
context_lower = context.lower()
|
|
|
|
# Type detection (exclusive - only one)
|
|
type_label = None
|
|
if any(word in context_lower for word in ['bug', 'error', 'fix', 'broken', 'crash', 'fail']):
|
|
type_label = self._find_label(label_lookup, 'type', 'bug')
|
|
elif any(word in context_lower for word in ['refactor', 'extract', 'restructure', 'architecture', 'service extraction']):
|
|
type_label = self._find_label(label_lookup, 'type', 'refactor')
|
|
elif any(word in context_lower for word in ['feature', 'add', 'implement', 'new', 'create']):
|
|
type_label = self._find_label(label_lookup, 'type', 'feature')
|
|
elif any(word in context_lower for word in ['docs', 'documentation', 'readme', 'guide']):
|
|
type_label = self._find_label(label_lookup, 'type', 'documentation')
|
|
elif any(word in context_lower for word in ['test', 'testing', 'spec', 'coverage']):
|
|
type_label = self._find_label(label_lookup, 'type', 'test')
|
|
elif any(word in context_lower for word in ['chore', 'maintenance', 'update', 'upgrade']):
|
|
type_label = self._find_label(label_lookup, 'type', 'chore')
|
|
if type_label:
|
|
suggested.append(type_label)
|
|
|
|
# Priority detection
|
|
priority_label = None
|
|
if any(word in context_lower for word in ['critical', 'urgent', 'blocker', 'blocking', 'emergency']):
|
|
priority_label = self._find_label(label_lookup, 'priority', 'critical')
|
|
elif any(word in context_lower for word in ['high', 'important', 'asap', 'soon']):
|
|
priority_label = self._find_label(label_lookup, 'priority', 'high')
|
|
elif any(word in context_lower for word in ['low', 'nice-to-have', 'optional', 'later']):
|
|
priority_label = self._find_label(label_lookup, 'priority', 'low')
|
|
else:
|
|
priority_label = self._find_label(label_lookup, 'priority', 'medium')
|
|
if priority_label:
|
|
suggested.append(priority_label)
|
|
|
|
# Complexity detection
|
|
complexity_label = None
|
|
if any(word in context_lower for word in ['simple', 'trivial', 'easy', 'quick']):
|
|
complexity_label = self._find_label(label_lookup, 'complexity', 'simple')
|
|
elif any(word in context_lower for word in ['complex', 'difficult', 'challenging', 'intricate']):
|
|
complexity_label = self._find_label(label_lookup, 'complexity', 'complex')
|
|
else:
|
|
complexity_label = self._find_label(label_lookup, 'complexity', 'medium')
|
|
if complexity_label:
|
|
suggested.append(complexity_label)
|
|
|
|
# Effort detection (supports both "Effort" and "Efforts" naming)
|
|
effort_label = None
|
|
if any(word in context_lower for word in ['xs', 'tiny', '1 hour', '2 hours']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'xs')
|
|
elif any(word in context_lower for word in ['small', 's ', '1 day', 'half day']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 's')
|
|
elif any(word in context_lower for word in ['medium', 'm ', '2 days', '3 days']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'm')
|
|
elif any(word in context_lower for word in ['large', 'l ', '1 week', '5 days']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'l')
|
|
elif any(word in context_lower for word in ['xl', 'extra large', '2 weeks', 'sprint']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'xl')
|
|
if effort_label:
|
|
suggested.append(effort_label)
|
|
|
|
# Component detection (based on keywords)
|
|
component_mappings = {
|
|
'backend': ['backend', 'server', 'api', 'database', 'service'],
|
|
'frontend': ['frontend', 'ui', 'interface', 'react', 'vue', 'component'],
|
|
'api': ['api', 'endpoint', 'rest', 'graphql', 'route'],
|
|
'database': ['database', 'db', 'sql', 'migration', 'schema', 'postgres'],
|
|
'auth': ['auth', 'authentication', 'login', 'oauth', 'token', 'session'],
|
|
'deploy': ['deploy', 'deployment', 'docker', 'kubernetes', 'ci/cd'],
|
|
'testing': ['test', 'testing', 'spec', 'jest', 'pytest', 'coverage'],
|
|
'docs': ['docs', 'documentation', 'readme', 'guide', 'wiki']
|
|
}
|
|
|
|
for component, keywords in component_mappings.items():
|
|
if any(keyword in context_lower for keyword in keywords):
|
|
label = self._find_label(label_lookup, 'component', component)
|
|
if label and label not in suggested:
|
|
suggested.append(label)
|
|
|
|
# Tech stack detection
|
|
tech_mappings = {
|
|
'python': ['python', 'fastapi', 'django', 'flask', 'pytest'],
|
|
'javascript': ['javascript', 'js', 'node', 'npm', 'yarn'],
|
|
'docker': ['docker', 'dockerfile', 'container', 'compose'],
|
|
'postgresql': ['postgres', 'postgresql', 'psql', 'sql'],
|
|
'redis': ['redis', 'cache', 'session store'],
|
|
'vue': ['vue', 'vuejs', 'nuxt'],
|
|
'fastapi': ['fastapi', 'pydantic', 'starlette']
|
|
}
|
|
|
|
for tech, keywords in tech_mappings.items():
|
|
if any(keyword in context_lower for keyword in keywords):
|
|
label = self._find_label(label_lookup, 'tech', tech)
|
|
if label and label not in suggested:
|
|
suggested.append(label)
|
|
|
|
# Source detection (based on git branch or context)
|
|
source_label = None
|
|
if 'development' in context_lower or 'dev/' in context_lower:
|
|
source_label = self._find_label(label_lookup, 'source', 'development')
|
|
elif 'staging' in context_lower or 'stage/' in context_lower:
|
|
source_label = self._find_label(label_lookup, 'source', 'staging')
|
|
elif 'production' in context_lower or 'prod' in context_lower:
|
|
source_label = self._find_label(label_lookup, 'source', 'production')
|
|
if source_label:
|
|
suggested.append(source_label)
|
|
|
|
# Risk detection
|
|
risk_label = None
|
|
if any(word in context_lower for word in ['breaking', 'breaking change', 'major', 'risky']):
|
|
risk_label = self._find_label(label_lookup, 'risk', 'high')
|
|
elif any(word in context_lower for word in ['safe', 'low risk', 'minor']):
|
|
risk_label = self._find_label(label_lookup, 'risk', 'low')
|
|
if risk_label:
|
|
suggested.append(risk_label)
|
|
|
|
logger.info(f"Suggested {len(suggested)} labels based on context and {len(label_names)} available labels")
|
|
return suggested
|
|
|
|
def _build_label_lookup(self, label_names: List[str]) -> Dict[str, Dict[str, str]]:
|
|
"""
|
|
Build a lookup dictionary for label matching.
|
|
|
|
Supports various label formats:
|
|
- Slash format: Type/Bug, Priority/High
|
|
- Colon-space format: Type: Bug, Priority: High
|
|
- Colon format: Type:Bug
|
|
|
|
Args:
|
|
label_names: List of actual label names from repository
|
|
|
|
Returns:
|
|
Nested dict: {category: {value: actual_label_name}}
|
|
"""
|
|
lookup: Dict[str, Dict[str, str]] = {}
|
|
|
|
for label in label_names:
|
|
# Try different separator patterns
|
|
# Pattern: Category<separator>Value
|
|
# Separators: /, : , :
|
|
match = re.match(r'^([^/:]+)(?:/|:\s*|:)(.+)$', label)
|
|
if match:
|
|
category = match.group(1).lower().rstrip('s') # Normalize: "Efforts" -> "effort"
|
|
value = match.group(2).lower()
|
|
|
|
if category not in lookup:
|
|
lookup[category] = {}
|
|
lookup[category][value] = label
|
|
|
|
return lookup
|
|
|
|
def _find_label(self, lookup: Dict[str, Dict[str, str]], category: str, value: str) -> Optional[str]:
|
|
"""
|
|
Find actual label name from lookup.
|
|
|
|
Args:
|
|
lookup: Label lookup dictionary
|
|
category: Category to search (e.g., 'type', 'priority')
|
|
value: Value to find (e.g., 'bug', 'high')
|
|
|
|
Returns:
|
|
Actual label name if found, None otherwise
|
|
"""
|
|
category_lower = category.lower().rstrip('s') # Normalize
|
|
value_lower = value.lower()
|
|
|
|
if category_lower in lookup and value_lower in lookup[category_lower]:
|
|
return lookup[category_lower][value_lower]
|
|
|
|
return None
|