Add missing create_pull_request tool to Gitea MCP server. This completes the PR lifecycle - previously only had list/get/review/comment tools. - Add create_pull_request to GiteaClient - Add async wrapper to PullRequestTools with branch permissions - Register tool in server.py with proper schema - Parameters: title, body, head, base, labels (optional) - Branch-aware security: only allowed on development/feature branches Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
829 lines
29 KiB
Python
829 lines
29 KiB
Python
"""
|
|
Gitea API client for interacting with Gitea API.
|
|
|
|
Provides synchronous methods for:
|
|
- Issue CRUD operations
|
|
- Label management
|
|
- Repository operations
|
|
- PMO multi-repo aggregation
|
|
- Wiki operations (lessons learned)
|
|
- Milestone management
|
|
- Issue dependencies
|
|
"""
|
|
import requests
|
|
import logging
|
|
import re
|
|
from typing import List, Dict, Optional
|
|
from .config import GiteaConfig
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GiteaClient:
|
|
"""Client for interacting with Gitea API"""
|
|
|
|
def __init__(self):
|
|
"""Initialize Gitea client with configuration"""
|
|
config = GiteaConfig()
|
|
config_dict = config.load()
|
|
|
|
self.base_url = config_dict['api_url']
|
|
self.token = config_dict['api_token']
|
|
self.repo = config_dict.get('repo') # Optional default repo in owner/repo format
|
|
self.mode = config_dict['mode']
|
|
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'Authorization': f'token {self.token}',
|
|
'Content-Type': 'application/json'
|
|
})
|
|
|
|
logger.info(f"Gitea client initialized in {self.mode} mode")
|
|
|
|
def _parse_repo(self, repo: Optional[str] = None) -> tuple:
|
|
"""Parse owner/repo from input. Always requires 'owner/repo' format."""
|
|
target = repo or self.repo
|
|
if not target or '/' not in target:
|
|
raise ValueError("Use 'owner/repo' format (e.g. 'org/repo-name')")
|
|
parts = target.split('/', 1)
|
|
return parts[0], parts[1]
|
|
|
|
def list_issues(
|
|
self,
|
|
state: str = 'open',
|
|
labels: Optional[List[str]] = None,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""
|
|
List issues from Gitea repository.
|
|
|
|
Args:
|
|
state: Issue state (open, closed, all)
|
|
labels: Filter by labels
|
|
repo: Repository in 'owner/repo' format
|
|
|
|
Returns:
|
|
List of issue dictionaries
|
|
"""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues"
|
|
params = {'state': state}
|
|
if labels:
|
|
params['labels'] = ','.join(labels)
|
|
logger.info(f"Listing issues from {owner}/{target_repo} with state={state}")
|
|
response = self.session.get(url, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_issue(
|
|
self,
|
|
issue_number: int,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Get specific issue details."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}"
|
|
logger.info(f"Getting issue #{issue_number} from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_issue(
|
|
self,
|
|
title: str,
|
|
body: str,
|
|
labels: Optional[List[str]] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Create a new issue in Gitea."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues"
|
|
data = {'title': title, 'body': body}
|
|
if labels:
|
|
label_ids = self._resolve_label_ids(labels, owner, target_repo)
|
|
data['labels'] = label_ids
|
|
logger.info(f"Creating issue in {owner}/{target_repo}: {title}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def _resolve_label_ids(self, label_names: List[str], owner: str, repo: str) -> List[int]:
|
|
"""Convert label names to label IDs."""
|
|
full_repo = f"{owner}/{repo}"
|
|
|
|
# Only fetch org labels if repo belongs to an organization
|
|
org_labels = []
|
|
if self.is_org_repo(full_repo):
|
|
org_labels = self.get_org_labels(owner)
|
|
|
|
repo_labels = self.get_labels(full_repo)
|
|
all_labels = org_labels + repo_labels
|
|
label_map = {label['name']: label['id'] for label in all_labels}
|
|
label_ids = []
|
|
for name in label_names:
|
|
if name in label_map:
|
|
label_ids.append(label_map[name])
|
|
else:
|
|
logger.warning(f"Label '{name}' not found, skipping")
|
|
return label_ids
|
|
|
|
def update_issue(
|
|
self,
|
|
issue_number: int,
|
|
title: Optional[str] = None,
|
|
body: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
labels: Optional[List[str]] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Update existing issue. Repo must be 'owner/repo' format."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}"
|
|
data = {}
|
|
if title is not None:
|
|
data['title'] = title
|
|
if body is not None:
|
|
data['body'] = body
|
|
if state is not None:
|
|
data['state'] = state
|
|
if labels is not None:
|
|
data['labels'] = labels
|
|
logger.info(f"Updating issue #{issue_number} in {owner}/{target_repo}")
|
|
response = self.session.patch(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def add_comment(
|
|
self,
|
|
issue_number: int,
|
|
comment: str,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Add comment to issue. Repo must be 'owner/repo' format."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/comments"
|
|
data = {'body': comment}
|
|
logger.info(f"Adding comment to issue #{issue_number} in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_labels(self, repo: Optional[str] = None) -> List[Dict]:
|
|
"""Get all labels from repository. Repo must be 'owner/repo' format."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/labels"
|
|
logger.info(f"Getting labels from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_org_labels(self, org: str) -> List[Dict]:
|
|
"""Get organization-level labels. Org is the organization name."""
|
|
url = f"{self.base_url}/orgs/{org}/labels"
|
|
logger.info(f"Getting organization labels for {org}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def list_repos(self, org: str) -> List[Dict]:
|
|
"""List all repositories in organization. Org is the organization name."""
|
|
url = f"{self.base_url}/orgs/{org}/repos"
|
|
logger.info(f"Listing all repositories for organization {org}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def aggregate_issues(
|
|
self,
|
|
org: str,
|
|
state: str = 'open',
|
|
labels: Optional[List[str]] = None
|
|
) -> Dict[str, List[Dict]]:
|
|
"""Fetch issues across all repositories in org."""
|
|
repos = self.list_repos(org)
|
|
aggregated = {}
|
|
logger.info(f"Aggregating issues across {len(repos)} repositories")
|
|
for repo in repos:
|
|
repo_name = repo['name']
|
|
try:
|
|
issues = self.list_issues(
|
|
state=state,
|
|
labels=labels,
|
|
repo=f"{org}/{repo_name}"
|
|
)
|
|
if issues:
|
|
aggregated[repo_name] = issues
|
|
logger.info(f"Found {len(issues)} issues in {repo_name}")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching issues from {repo_name}: {e}")
|
|
|
|
return aggregated
|
|
|
|
# ========================================
|
|
# WIKI OPERATIONS (Lessons Learned)
|
|
# ========================================
|
|
|
|
def list_wiki_pages(self, repo: Optional[str] = None) -> List[Dict]:
|
|
"""List all wiki pages in repository."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/pages"
|
|
logger.info(f"Listing wiki pages from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_wiki_page(
|
|
self,
|
|
page_name: str,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Get a specific wiki page by name."""
|
|
from urllib.parse import quote
|
|
owner, target_repo = self._parse_repo(repo)
|
|
# URL-encode the page_name to handle special characters like ':'
|
|
encoded_page_name = quote(page_name, safe='')
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}"
|
|
logger.info(f"Getting wiki page '{page_name}' from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_wiki_page(
|
|
self,
|
|
title: str,
|
|
content: str,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Create a new wiki page."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/new"
|
|
data = {
|
|
'title': title,
|
|
'content_base64': self._encode_base64(content)
|
|
}
|
|
logger.info(f"Creating wiki page '{title}' in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def update_wiki_page(
|
|
self,
|
|
page_name: str,
|
|
content: str,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Update an existing wiki page."""
|
|
from urllib.parse import quote
|
|
owner, target_repo = self._parse_repo(repo)
|
|
# URL-encode the page_name to handle special characters like ':'
|
|
encoded_page_name = quote(page_name, safe='')
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}"
|
|
data = {
|
|
'title': page_name, # CRITICAL: include title to preserve page name
|
|
'content_base64': self._encode_base64(content)
|
|
}
|
|
logger.info(f"Updating wiki page '{page_name}' in {owner}/{target_repo}")
|
|
response = self.session.patch(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def delete_wiki_page(
|
|
self,
|
|
page_name: str,
|
|
repo: Optional[str] = None
|
|
) -> bool:
|
|
"""Delete a wiki page."""
|
|
from urllib.parse import quote
|
|
owner, target_repo = self._parse_repo(repo)
|
|
# URL-encode the page_name to handle special characters like ':'
|
|
encoded_page_name = quote(page_name, safe='')
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/wiki/page/{encoded_page_name}"
|
|
logger.info(f"Deleting wiki page '{page_name}' from {owner}/{target_repo}")
|
|
response = self.session.delete(url)
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
def _encode_base64(self, content: str) -> str:
|
|
"""Encode content to base64 for wiki API."""
|
|
import base64
|
|
return base64.b64encode(content.encode('utf-8')).decode('utf-8')
|
|
|
|
def _decode_base64(self, content: str) -> str:
|
|
"""Decode base64 content from wiki API."""
|
|
import base64
|
|
return base64.b64decode(content.encode('utf-8')).decode('utf-8')
|
|
|
|
def search_wiki_pages(
|
|
self,
|
|
query: str,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""Search wiki pages by content (client-side filtering)."""
|
|
pages = self.list_wiki_pages(repo)
|
|
results = []
|
|
query_lower = query.lower()
|
|
for page in pages:
|
|
if query_lower in page.get('title', '').lower():
|
|
results.append(page)
|
|
return results
|
|
|
|
def create_lesson(
|
|
self,
|
|
title: str,
|
|
content: str,
|
|
tags: List[str],
|
|
category: str = "sprints",
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Create a lessons learned entry in the wiki."""
|
|
# Sanitize title for wiki page name
|
|
page_name = f"lessons/{category}/{self._sanitize_page_name(title)}"
|
|
|
|
# Add tags as metadata at the end of content
|
|
full_content = f"{content}\n\n---\n**Tags:** {', '.join(tags)}"
|
|
|
|
return self.create_wiki_page(page_name, full_content, repo)
|
|
|
|
def search_lessons(
|
|
self,
|
|
query: Optional[str] = None,
|
|
tags: Optional[List[str]] = None,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""Search lessons learned by query and/or tags."""
|
|
pages = self.list_wiki_pages(repo)
|
|
results = []
|
|
|
|
for page in pages:
|
|
title = page.get('title', '')
|
|
# Filter to only lessons (pages starting with lessons/)
|
|
if not title.startswith('lessons/'):
|
|
continue
|
|
|
|
# If query provided, check if it matches title
|
|
if query:
|
|
if query.lower() not in title.lower():
|
|
continue
|
|
|
|
# Get full page content for tag matching if tags provided
|
|
if tags:
|
|
try:
|
|
full_page = self.get_wiki_page(title, repo)
|
|
content = self._decode_base64(full_page.get('content_base64', ''))
|
|
# Check if any tag is in the content
|
|
if not any(tag.lower() in content.lower() for tag in tags):
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
results.append(page)
|
|
|
|
return results
|
|
|
|
def _sanitize_page_name(self, title: str) -> str:
|
|
"""Convert title to valid wiki page name."""
|
|
# Replace spaces with hyphens, remove special chars
|
|
name = re.sub(r'[^\w\s-]', '', title)
|
|
name = re.sub(r'[\s]+', '-', name)
|
|
return name.lower()
|
|
|
|
# ========================================
|
|
# MILESTONE OPERATIONS
|
|
# ========================================
|
|
|
|
def list_milestones(
|
|
self,
|
|
state: str = 'open',
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""List all milestones in repository."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones"
|
|
params = {'state': state}
|
|
logger.info(f"Listing milestones from {owner}/{target_repo}")
|
|
response = self.session.get(url, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_milestone(
|
|
self,
|
|
milestone_id: int,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Get a specific milestone by ID."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}"
|
|
logger.info(f"Getting milestone #{milestone_id} from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_milestone(
|
|
self,
|
|
title: str,
|
|
description: Optional[str] = None,
|
|
due_on: Optional[str] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Create a new milestone."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones"
|
|
data = {'title': title}
|
|
if description:
|
|
data['description'] = description
|
|
if due_on:
|
|
data['due_on'] = due_on
|
|
logger.info(f"Creating milestone '{title}' in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def update_milestone(
|
|
self,
|
|
milestone_id: int,
|
|
title: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
due_on: Optional[str] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Update an existing milestone."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}"
|
|
data = {}
|
|
if title is not None:
|
|
data['title'] = title
|
|
if description is not None:
|
|
data['description'] = description
|
|
if state is not None:
|
|
data['state'] = state
|
|
if due_on is not None:
|
|
data['due_on'] = due_on
|
|
logger.info(f"Updating milestone #{milestone_id} in {owner}/{target_repo}")
|
|
response = self.session.patch(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def delete_milestone(
|
|
self,
|
|
milestone_id: int,
|
|
repo: Optional[str] = None
|
|
) -> bool:
|
|
"""Delete a milestone."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/milestones/{milestone_id}"
|
|
logger.info(f"Deleting milestone #{milestone_id} from {owner}/{target_repo}")
|
|
response = self.session.delete(url)
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
# ========================================
|
|
# ISSUE DEPENDENCY OPERATIONS
|
|
# ========================================
|
|
|
|
def list_issue_dependencies(
|
|
self,
|
|
issue_number: int,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""List all dependencies for an issue (issues that block this one)."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies"
|
|
logger.info(f"Listing dependencies for issue #{issue_number} in {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_issue_dependency(
|
|
self,
|
|
issue_number: int,
|
|
depends_on: int,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Create a dependency (issue_number depends on depends_on)."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies"
|
|
data = {
|
|
'dependentIssue': {
|
|
'owner': owner,
|
|
'repo': target_repo,
|
|
'index': depends_on
|
|
}
|
|
}
|
|
logger.info(f"Creating dependency: #{issue_number} depends on #{depends_on} in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def remove_issue_dependency(
|
|
self,
|
|
issue_number: int,
|
|
depends_on: int,
|
|
repo: Optional[str] = None
|
|
) -> bool:
|
|
"""Remove a dependency between issues."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/dependencies"
|
|
data = {
|
|
'dependentIssue': {
|
|
'owner': owner,
|
|
'repo': target_repo,
|
|
'index': depends_on
|
|
}
|
|
}
|
|
logger.info(f"Removing dependency: #{issue_number} no longer depends on #{depends_on}")
|
|
response = self.session.delete(url, json=data)
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
def list_issue_blocks(
|
|
self,
|
|
issue_number: int,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""List all issues that this issue blocks."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/blocks"
|
|
logger.info(f"Listing issues blocked by #{issue_number} in {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
# ========================================
|
|
# REPOSITORY VALIDATION
|
|
# ========================================
|
|
|
|
def get_repo_info(self, repo: Optional[str] = None) -> Dict:
|
|
"""Get repository information including owner type."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}"
|
|
logger.info(f"Getting repo info for {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def is_org_repo(self, repo: Optional[str] = None) -> bool:
|
|
"""
|
|
Check if repository belongs to an organization (not a user).
|
|
|
|
Uses the /orgs/{owner} endpoint to reliably detect organizations,
|
|
as the owner.type field in repo info may be null in some Gitea versions.
|
|
"""
|
|
owner, _ = self._parse_repo(repo)
|
|
return self._is_organization(owner)
|
|
|
|
def _is_organization(self, owner: str) -> bool:
|
|
"""
|
|
Check if an owner is an organization by querying the orgs endpoint.
|
|
|
|
Args:
|
|
owner: The owner name to check
|
|
|
|
Returns:
|
|
True if owner is an organization, False if user or unknown
|
|
"""
|
|
url = f"{self.base_url}/orgs/{owner}"
|
|
try:
|
|
response = self.session.get(url)
|
|
# 200 = organization exists, 404 = not an organization (user account)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.warning(f"Failed to check if {owner} is organization: {e}")
|
|
return False
|
|
|
|
def get_branch_protection(
|
|
self,
|
|
branch: str,
|
|
repo: Optional[str] = None
|
|
) -> Optional[Dict]:
|
|
"""Get branch protection rules for a branch."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/branch_protections/{branch}"
|
|
logger.info(f"Getting branch protection for {branch} in {owner}/{target_repo}")
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
return None # No protection rules
|
|
raise
|
|
|
|
def create_label(
|
|
self,
|
|
name: str,
|
|
color: str,
|
|
description: Optional[str] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Create a new label in the repository."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/labels"
|
|
data = {
|
|
'name': name,
|
|
'color': color.lstrip('#') # Remove # if present
|
|
}
|
|
if description:
|
|
data['description'] = description
|
|
logger.info(f"Creating label '{name}' in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_org_label(
|
|
self,
|
|
org: str,
|
|
name: str,
|
|
color: str,
|
|
description: Optional[str] = None
|
|
) -> Dict:
|
|
"""
|
|
Create a new label at the organization level.
|
|
|
|
Organization labels are shared across all repositories in the org.
|
|
Use this for workflow labels (Type, Priority, Complexity, Effort, etc.)
|
|
|
|
Args:
|
|
org: Organization name
|
|
name: Label name (e.g., 'Type/Bug', 'Priority/High')
|
|
color: Hex color code (with or without #)
|
|
description: Optional label description
|
|
|
|
Returns:
|
|
Created label dictionary
|
|
"""
|
|
url = f"{self.base_url}/orgs/{org}/labels"
|
|
data = {
|
|
'name': name,
|
|
'color': color.lstrip('#') # Remove # if present
|
|
}
|
|
if description:
|
|
data['description'] = description
|
|
logger.info(f"Creating organization label '{name}' in {org}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
# ========================================
|
|
# PULL REQUEST OPERATIONS
|
|
# ========================================
|
|
|
|
def list_pull_requests(
|
|
self,
|
|
state: str = 'open',
|
|
sort: str = 'recentupdate',
|
|
labels: Optional[List[str]] = None,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""
|
|
List pull requests from Gitea repository.
|
|
|
|
Args:
|
|
state: PR state (open, closed, all)
|
|
sort: Sort order (oldest, recentupdate, leastupdate, mostcomment, leastcomment, priority)
|
|
labels: Filter by labels
|
|
repo: Repository in 'owner/repo' format
|
|
|
|
Returns:
|
|
List of pull request dictionaries
|
|
"""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls"
|
|
params = {'state': state, 'sort': sort}
|
|
if labels:
|
|
params['labels'] = ','.join(labels)
|
|
logger.info(f"Listing PRs from {owner}/{target_repo} with state={state}")
|
|
response = self.session.get(url, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_pull_request(
|
|
self,
|
|
pr_number: int,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Get specific pull request details."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}"
|
|
logger.info(f"Getting PR #{pr_number} from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_pr_diff(
|
|
self,
|
|
pr_number: int,
|
|
repo: Optional[str] = None
|
|
) -> str:
|
|
"""Get the diff for a pull request."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}.diff"
|
|
logger.info(f"Getting diff for PR #{pr_number} from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.text
|
|
|
|
def get_pr_comments(
|
|
self,
|
|
pr_number: int,
|
|
repo: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""Get comments on a pull request (uses issue comments endpoint)."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
# PRs share comment endpoint with issues in Gitea
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{pr_number}/comments"
|
|
logger.info(f"Getting comments for PR #{pr_number} from {owner}/{target_repo}")
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_pr_review(
|
|
self,
|
|
pr_number: int,
|
|
body: str,
|
|
event: str = 'COMMENT',
|
|
comments: Optional[List[Dict]] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""
|
|
Create a review on a pull request.
|
|
|
|
Args:
|
|
pr_number: Pull request number
|
|
body: Review body/summary
|
|
event: Review action (APPROVE, REQUEST_CHANGES, COMMENT)
|
|
comments: Optional list of inline comments with path, position, body
|
|
repo: Repository in 'owner/repo' format
|
|
|
|
Returns:
|
|
Created review dictionary
|
|
"""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls/{pr_number}/reviews"
|
|
data = {
|
|
'body': body,
|
|
'event': event
|
|
}
|
|
if comments:
|
|
data['comments'] = comments
|
|
logger.info(f"Creating review on PR #{pr_number} in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def add_pr_comment(
|
|
self,
|
|
pr_number: int,
|
|
body: str,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""Add a general comment to a pull request (uses issue comment endpoint)."""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
# PRs share comment endpoint with issues in Gitea
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{pr_number}/comments"
|
|
data = {'body': body}
|
|
logger.info(f"Adding comment to PR #{pr_number} in {owner}/{target_repo}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def create_pull_request(
|
|
self,
|
|
title: str,
|
|
body: str,
|
|
head: str,
|
|
base: str,
|
|
labels: Optional[List[str]] = None,
|
|
repo: Optional[str] = None
|
|
) -> Dict:
|
|
"""
|
|
Create a new pull request.
|
|
|
|
Args:
|
|
title: PR title
|
|
body: PR description/body
|
|
head: Source branch name (the branch with changes)
|
|
base: Target branch name (the branch to merge into)
|
|
labels: Optional list of label names
|
|
repo: Repository in 'owner/repo' format
|
|
|
|
Returns:
|
|
Created pull request dictionary
|
|
"""
|
|
owner, target_repo = self._parse_repo(repo)
|
|
url = f"{self.base_url}/repos/{owner}/{target_repo}/pulls"
|
|
data = {
|
|
'title': title,
|
|
'body': body,
|
|
'head': head,
|
|
'base': base
|
|
}
|
|
if labels:
|
|
label_ids = self._resolve_label_ids(labels, owner, target_repo)
|
|
data['labels'] = label_ids
|
|
logger.info(f"Creating PR '{title}' in {owner}/{target_repo}: {head} -> {base}")
|
|
response = self.session.post(url, json=data)
|
|
response.raise_for_status()
|
|
return response.json()
|