Fixes multiple issues from diagnostic #123: 1. create_label_smart type safety (labels.py) - Add isinstance(result, dict) checks after API calls - Return structured error dict if API returns unexpected type - Prevents "list indices must be integers" crash 2. debug-report always uses curl with labels - Remove MCP option - always use curl for marketplace issues - Add label ID fetching step (Source/Diagnostic, Type/Bug) - Include labels in curl POST payload - Avoids branch protection restrictions on main branch Fixes #123 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
378 lines
16 KiB
Python
378 lines
16 KiB
Python
"""
|
|
Label management tools for MCP server.
|
|
|
|
Provides async wrappers for label operations with:
|
|
- Label taxonomy retrieval
|
|
- Intelligent label suggestion
|
|
- Dynamic label detection
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from typing import List, Dict, Optional
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LabelTools:
|
|
"""Async wrappers for Gitea label operations"""
|
|
|
|
def __init__(self, gitea_client):
|
|
"""
|
|
Initialize label tools.
|
|
|
|
Args:
|
|
gitea_client: GiteaClient instance
|
|
"""
|
|
self.gitea = gitea_client
|
|
|
|
async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]:
|
|
"""Get all labels (org + repo if org-owned, repo-only if user-owned)."""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
target_repo = repo or self.gitea.repo
|
|
if not target_repo or '/' not in target_repo:
|
|
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
|
|
|
|
# Check if repo belongs to an organization or user
|
|
is_org = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.is_org_repo(target_repo)
|
|
)
|
|
|
|
org_labels = []
|
|
if is_org:
|
|
org = target_repo.split('/')[0]
|
|
org_labels = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.get_org_labels(org)
|
|
)
|
|
|
|
repo_labels = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.get_labels(target_repo)
|
|
)
|
|
|
|
return {
|
|
'organization': org_labels,
|
|
'repository': repo_labels,
|
|
'total_count': len(org_labels) + len(repo_labels)
|
|
}
|
|
|
|
async def suggest_labels(self, context: str, repo: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Analyze context and suggest appropriate labels from repository's actual labels.
|
|
|
|
This method fetches actual labels from the repository and matches them
|
|
dynamically, supporting any label naming convention (slash, colon-space, etc.).
|
|
|
|
Args:
|
|
context: Issue title + description or sprint context
|
|
repo: Repository in 'owner/repo' format (optional, uses default if not provided)
|
|
|
|
Returns:
|
|
List of suggested label names that exist in the repository
|
|
"""
|
|
# Fetch actual labels from repository
|
|
target_repo = repo or self.gitea.repo
|
|
if not target_repo:
|
|
logger.warning("No repository specified, returning empty suggestions")
|
|
return []
|
|
|
|
try:
|
|
labels_data = await self.get_labels(target_repo)
|
|
all_labels = labels_data.get('organization', []) + labels_data.get('repository', [])
|
|
label_names = [label['name'] for label in all_labels]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch labels: {e}. Using fallback suggestions.")
|
|
label_names = []
|
|
|
|
# Build label lookup for dynamic matching
|
|
label_lookup = self._build_label_lookup(label_names)
|
|
|
|
suggested = []
|
|
context_lower = context.lower()
|
|
|
|
# Type detection (exclusive - only one)
|
|
type_label = None
|
|
if any(word in context_lower for word in ['bug', 'error', 'fix', 'broken', 'crash', 'fail']):
|
|
type_label = self._find_label(label_lookup, 'type', 'bug')
|
|
elif any(word in context_lower for word in ['refactor', 'extract', 'restructure', 'architecture', 'service extraction']):
|
|
type_label = self._find_label(label_lookup, 'type', 'refactor')
|
|
elif any(word in context_lower for word in ['feature', 'add', 'implement', 'new', 'create']):
|
|
type_label = self._find_label(label_lookup, 'type', 'feature')
|
|
elif any(word in context_lower for word in ['docs', 'documentation', 'readme', 'guide']):
|
|
type_label = self._find_label(label_lookup, 'type', 'documentation')
|
|
elif any(word in context_lower for word in ['test', 'testing', 'spec', 'coverage']):
|
|
type_label = self._find_label(label_lookup, 'type', 'test')
|
|
elif any(word in context_lower for word in ['chore', 'maintenance', 'update', 'upgrade']):
|
|
type_label = self._find_label(label_lookup, 'type', 'chore')
|
|
if type_label:
|
|
suggested.append(type_label)
|
|
|
|
# Priority detection
|
|
priority_label = None
|
|
if any(word in context_lower for word in ['critical', 'urgent', 'blocker', 'blocking', 'emergency']):
|
|
priority_label = self._find_label(label_lookup, 'priority', 'critical')
|
|
elif any(word in context_lower for word in ['high', 'important', 'asap', 'soon']):
|
|
priority_label = self._find_label(label_lookup, 'priority', 'high')
|
|
elif any(word in context_lower for word in ['low', 'nice-to-have', 'optional', 'later']):
|
|
priority_label = self._find_label(label_lookup, 'priority', 'low')
|
|
else:
|
|
priority_label = self._find_label(label_lookup, 'priority', 'medium')
|
|
if priority_label:
|
|
suggested.append(priority_label)
|
|
|
|
# Complexity detection
|
|
complexity_label = None
|
|
if any(word in context_lower for word in ['simple', 'trivial', 'easy', 'quick']):
|
|
complexity_label = self._find_label(label_lookup, 'complexity', 'simple')
|
|
elif any(word in context_lower for word in ['complex', 'difficult', 'challenging', 'intricate']):
|
|
complexity_label = self._find_label(label_lookup, 'complexity', 'complex')
|
|
else:
|
|
complexity_label = self._find_label(label_lookup, 'complexity', 'medium')
|
|
if complexity_label:
|
|
suggested.append(complexity_label)
|
|
|
|
# Effort detection (supports both "Effort" and "Efforts" naming)
|
|
effort_label = None
|
|
if any(word in context_lower for word in ['xs', 'tiny', '1 hour', '2 hours']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'xs')
|
|
elif any(word in context_lower for word in ['small', 's ', '1 day', 'half day']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 's')
|
|
elif any(word in context_lower for word in ['medium', 'm ', '2 days', '3 days']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'm')
|
|
elif any(word in context_lower for word in ['large', 'l ', '1 week', '5 days']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'l')
|
|
elif any(word in context_lower for word in ['xl', 'extra large', '2 weeks', 'sprint']):
|
|
effort_label = self._find_label(label_lookup, 'effort', 'xl')
|
|
if effort_label:
|
|
suggested.append(effort_label)
|
|
|
|
# Component detection (based on keywords)
|
|
component_mappings = {
|
|
'backend': ['backend', 'server', 'api', 'database', 'service'],
|
|
'frontend': ['frontend', 'ui', 'interface', 'react', 'vue', 'component'],
|
|
'api': ['api', 'endpoint', 'rest', 'graphql', 'route'],
|
|
'database': ['database', 'db', 'sql', 'migration', 'schema', 'postgres'],
|
|
'auth': ['auth', 'authentication', 'login', 'oauth', 'token', 'session'],
|
|
'deploy': ['deploy', 'deployment', 'docker', 'kubernetes', 'ci/cd'],
|
|
'testing': ['test', 'testing', 'spec', 'jest', 'pytest', 'coverage'],
|
|
'docs': ['docs', 'documentation', 'readme', 'guide', 'wiki']
|
|
}
|
|
|
|
for component, keywords in component_mappings.items():
|
|
if any(keyword in context_lower for keyword in keywords):
|
|
label = self._find_label(label_lookup, 'component', component)
|
|
if label and label not in suggested:
|
|
suggested.append(label)
|
|
|
|
# Tech stack detection
|
|
tech_mappings = {
|
|
'python': ['python', 'fastapi', 'django', 'flask', 'pytest'],
|
|
'javascript': ['javascript', 'js', 'node', 'npm', 'yarn'],
|
|
'docker': ['docker', 'dockerfile', 'container', 'compose'],
|
|
'postgresql': ['postgres', 'postgresql', 'psql', 'sql'],
|
|
'redis': ['redis', 'cache', 'session store'],
|
|
'vue': ['vue', 'vuejs', 'nuxt'],
|
|
'fastapi': ['fastapi', 'pydantic', 'starlette']
|
|
}
|
|
|
|
for tech, keywords in tech_mappings.items():
|
|
if any(keyword in context_lower for keyword in keywords):
|
|
label = self._find_label(label_lookup, 'tech', tech)
|
|
if label and label not in suggested:
|
|
suggested.append(label)
|
|
|
|
# Source detection (based on git branch or context)
|
|
source_label = None
|
|
if 'development' in context_lower or 'dev/' in context_lower:
|
|
source_label = self._find_label(label_lookup, 'source', 'development')
|
|
elif 'staging' in context_lower or 'stage/' in context_lower:
|
|
source_label = self._find_label(label_lookup, 'source', 'staging')
|
|
elif 'production' in context_lower or 'prod' in context_lower:
|
|
source_label = self._find_label(label_lookup, 'source', 'production')
|
|
if source_label:
|
|
suggested.append(source_label)
|
|
|
|
# Risk detection
|
|
risk_label = None
|
|
if any(word in context_lower for word in ['breaking', 'breaking change', 'major', 'risky']):
|
|
risk_label = self._find_label(label_lookup, 'risk', 'high')
|
|
elif any(word in context_lower for word in ['safe', 'low risk', 'minor']):
|
|
risk_label = self._find_label(label_lookup, 'risk', 'low')
|
|
if risk_label:
|
|
suggested.append(risk_label)
|
|
|
|
logger.info(f"Suggested {len(suggested)} labels based on context and {len(label_names)} available labels")
|
|
return suggested
|
|
|
|
def _build_label_lookup(self, label_names: List[str]) -> Dict[str, Dict[str, str]]:
|
|
"""
|
|
Build a lookup dictionary for label matching.
|
|
|
|
Supports various label formats:
|
|
- Slash format: Type/Bug, Priority/High
|
|
- Colon-space format: Type: Bug, Priority: High
|
|
- Colon format: Type:Bug
|
|
|
|
Args:
|
|
label_names: List of actual label names from repository
|
|
|
|
Returns:
|
|
Nested dict: {category: {value: actual_label_name}}
|
|
"""
|
|
lookup: Dict[str, Dict[str, str]] = {}
|
|
|
|
for label in label_names:
|
|
# Try different separator patterns
|
|
# Pattern: Category<separator>Value
|
|
# Separators: /, : , :
|
|
match = re.match(r'^([^/:]+)(?:/|:\s*|:)(.+)$', label)
|
|
if match:
|
|
category = match.group(1).lower().rstrip('s') # Normalize: "Efforts" -> "effort"
|
|
value = match.group(2).lower()
|
|
|
|
if category not in lookup:
|
|
lookup[category] = {}
|
|
lookup[category][value] = label
|
|
|
|
return lookup
|
|
|
|
def _find_label(self, lookup: Dict[str, Dict[str, str]], category: str, value: str) -> Optional[str]:
|
|
"""
|
|
Find actual label name from lookup.
|
|
|
|
Args:
|
|
lookup: Label lookup dictionary
|
|
category: Category to search (e.g., 'type', 'priority')
|
|
value: Value to find (e.g., 'bug', 'high')
|
|
|
|
Returns:
|
|
Actual label name if found, None otherwise
|
|
"""
|
|
category_lower = category.lower().rstrip('s') # Normalize
|
|
value_lower = value.lower()
|
|
|
|
if category_lower in lookup and value_lower in lookup[category_lower]:
|
|
return lookup[category_lower][value_lower]
|
|
|
|
return None
|
|
|
|
# Organization-level label categories (workflow labels shared across repos)
|
|
ORG_LABEL_CATEGORIES = {'agent', 'complexity', 'effort', 'efforts', 'priority', 'risk', 'source', 'type'}
|
|
|
|
# Repository-level label categories (project-specific labels)
|
|
REPO_LABEL_CATEGORIES = {'component', 'tech'}
|
|
|
|
async def create_label_smart(
|
|
self,
|
|
name: str,
|
|
color: str,
|
|
description: Optional[str] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""
|
|
Create a label at the appropriate level (org or repo) based on category.
|
|
Skips if label already exists (checks both org and repo levels).
|
|
|
|
Organization labels: Agent, Complexity, Effort, Priority, Risk, Source, Type
|
|
Repository labels: Component, Tech
|
|
|
|
Args:
|
|
name: Label name (e.g., 'Type/Bug', 'Component/Backend')
|
|
color: Hex color code
|
|
description: Optional label description
|
|
repo: Repository in 'owner/repo' format
|
|
|
|
Returns:
|
|
Created label dictionary with 'level' key, or 'skipped' if already exists
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
target_repo = repo or self.gitea.repo
|
|
if not target_repo or '/' not in target_repo:
|
|
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
|
|
|
|
owner = target_repo.split('/')[0]
|
|
is_org = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.is_org_repo(target_repo)
|
|
)
|
|
|
|
# Fetch existing labels to check for duplicates
|
|
existing_labels = await self.get_labels(target_repo)
|
|
all_existing = existing_labels.get('organization', []) + existing_labels.get('repository', [])
|
|
existing_names = [label['name'].lower() for label in all_existing]
|
|
|
|
# Normalize the new label name for comparison
|
|
name_normalized = name.lower()
|
|
|
|
# Also check for format variations (Type/Bug vs Type: Bug)
|
|
name_variations = [name_normalized]
|
|
if '/' in name:
|
|
name_variations.append(name.replace('/', ': ').lower())
|
|
name_variations.append(name.replace('/', ':').lower())
|
|
elif ': ' in name:
|
|
name_variations.append(name.replace(': ', '/').lower())
|
|
elif ':' in name:
|
|
name_variations.append(name.replace(':', '/').lower())
|
|
|
|
# Check if label already exists in any format
|
|
for variation in name_variations:
|
|
if variation in existing_names:
|
|
logger.info(f"Label '{name}' already exists (found as '{variation}'), skipping")
|
|
return {
|
|
'name': name,
|
|
'skipped': True,
|
|
'reason': f"Label already exists",
|
|
'level': 'existing'
|
|
}
|
|
|
|
# Parse category from label name
|
|
category = None
|
|
if '/' in name:
|
|
category = name.split('/')[0].lower().rstrip('s')
|
|
elif ':' in name:
|
|
category = name.split(':')[0].strip().lower().rstrip('s')
|
|
|
|
# If it's an org repo and the category is an org-level category, create at org level
|
|
if is_org and category in self.ORG_LABEL_CATEGORIES:
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.create_org_label(owner, name, color, description)
|
|
)
|
|
# Handle unexpected response types (API may return list or non-dict)
|
|
if not isinstance(result, dict):
|
|
logger.error(f"Unexpected API response type for org label: {type(result)} - {result}")
|
|
return {
|
|
'name': name,
|
|
'error': True,
|
|
'reason': f"API returned {type(result).__name__} instead of dict: {result}",
|
|
'level': 'organization'
|
|
}
|
|
result['level'] = 'organization'
|
|
result['skipped'] = False
|
|
logger.info(f"Created organization label '{name}' in {owner}")
|
|
else:
|
|
# Create at repo level
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
lambda: self.gitea.create_label(name, color, description, target_repo)
|
|
)
|
|
# Handle unexpected response types (API may return list or non-dict)
|
|
if not isinstance(result, dict):
|
|
logger.error(f"Unexpected API response type for repo label: {type(result)} - {result}")
|
|
return {
|
|
'name': name,
|
|
'error': True,
|
|
'reason': f"API returned {type(result).__name__} instead of dict: {result}",
|
|
'level': 'repository'
|
|
}
|
|
result['level'] = 'repository'
|
|
result['skipped'] = False
|
|
logger.info(f"Created repository label '{name}' in {target_repo}")
|
|
|
|
return result
|