refactor: simplify gitea config to use owner/repo format

- Remove separate GITEA_OWNER config, use owner/repo format everywhere
- Add _parse_repo() helper to extract owner and repo from combined string
- Update plugin.json schema: file -> source, author as object
- Remove redundant configuration section from cmdb-assistant plugin
- Simplify gitea_client.py by removing excessive docstrings

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-12 02:12:22 -05:00
parent b067802da8
commit 310bd34e82
8 changed files with 97 additions and 276 deletions

View File

@@ -21,7 +21,6 @@ class GiteaConfig:
def __init__(self): def __init__(self):
self.api_url: Optional[str] = None self.api_url: Optional[str] = None
self.api_token: Optional[str] = None self.api_token: Optional[str] = None
self.owner: Optional[str] = None
self.repo: Optional[str] = None self.repo: Optional[str] = None
self.mode: str = 'project' self.mode: str = 'project'
@@ -31,7 +30,7 @@ class GiteaConfig:
Project-level configuration overrides system-level. Project-level configuration overrides system-level.
Returns: Returns:
Dict containing api_url, api_token, owner, repo, mode Dict containing api_url, api_token, repo, mode
Raises: Raises:
FileNotFoundError: If system config is missing FileNotFoundError: If system config is missing
@@ -58,8 +57,7 @@ class GiteaConfig:
# Extract values # Extract values
self.api_url = os.getenv('GITEA_API_URL') self.api_url = os.getenv('GITEA_API_URL')
self.api_token = os.getenv('GITEA_API_TOKEN') self.api_token = os.getenv('GITEA_API_TOKEN')
self.owner = os.getenv('GITEA_OWNER') self.repo = os.getenv('GITEA_REPO') # Optional, must be owner/repo format
self.repo = os.getenv('GITEA_REPO') # Optional for PMO
# Detect mode # Detect mode
if self.repo: if self.repo:
@@ -75,7 +73,6 @@ class GiteaConfig:
return { return {
'api_url': self.api_url, 'api_url': self.api_url,
'api_token': self.api_token, 'api_token': self.api_token,
'owner': self.owner,
'repo': self.repo, 'repo': self.repo,
'mode': self.mode 'mode': self.mode
} }
@@ -89,8 +86,7 @@ class GiteaConfig:
""" """
required = { required = {
'GITEA_API_URL': self.api_url, 'GITEA_API_URL': self.api_url,
'GITEA_API_TOKEN': self.api_token, 'GITEA_API_TOKEN': self.api_token
'GITEA_OWNER': self.owner
} }
missing = [key for key, value in required.items() if not value] missing = [key for key, value in required.items() if not value]

View File

@@ -26,8 +26,7 @@ class GiteaClient:
self.base_url = config_dict['api_url'] self.base_url = config_dict['api_url']
self.token = config_dict['api_token'] self.token = config_dict['api_token']
self.owner = config_dict['owner'] self.repo = config_dict.get('repo') # Optional default repo in owner/repo format
self.repo = config_dict.get('repo') # Optional for PMO
self.mode = config_dict['mode'] self.mode = config_dict['mode']
self.session = requests.Session() self.session = requests.Session()
@@ -36,7 +35,15 @@ class GiteaClient:
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
logger.info(f"Gitea client initialized for {self.owner} in {self.mode} mode") logger.info(f"Gitea client initialized in {self.mode} mode")
def _parse_repo(self, repo: Optional[str] = None) -> tuple:
"""Parse owner/repo from input. Always requires 'owner/repo' format."""
target = repo or self.repo
if not target or '/' not in target:
raise ValueError("Use 'owner/repo' format (e.g. 'bandit/support-claude-mktplace')")
parts = target.split('/', 1)
return parts[0], parts[1]
def list_issues( def list_issues(
self, self,
@@ -50,26 +57,17 @@ class GiteaClient:
Args: Args:
state: Issue state (open, closed, all) state: Issue state (open, closed, all)
labels: Filter by labels labels: Filter by labels
repo: Override configured repo (for PMO multi-repo) repo: Repository in 'owner/repo' format
Returns: Returns:
List of issue dictionaries List of issue dictionaries
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
""" """
target_repo = repo or self.repo owner, target_repo = self._parse_repo(repo)
if not target_repo: url = f"{self.base_url}/repos/{owner}/{target_repo}/issues"
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues"
params = {'state': state} params = {'state': state}
if labels: if labels:
params['labels'] = ','.join(labels) params['labels'] = ','.join(labels)
logger.info(f"Listing issues from {owner}/{target_repo} with state={state}")
logger.info(f"Listing issues from {self.owner}/{target_repo} with state={state}")
response = self.session.get(url, params=params) response = self.session.get(url, params=params)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
@@ -79,26 +77,10 @@ class GiteaClient:
issue_number: int, issue_number: int,
repo: Optional[str] = None repo: Optional[str] = None
) -> Dict: ) -> Dict:
""" """Get specific issue details."""
Get specific issue details. owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}"
Args: logger.info(f"Getting issue #{issue_number} from {owner}/{target_repo}")
issue_number: Issue number
repo: Override configured repo (for PMO multi-repo)
Returns:
Issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}"
logger.info(f"Getting issue #{issue_number} from {self.owner}/{target_repo}")
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
@@ -110,69 +92,30 @@ class GiteaClient:
labels: Optional[List[str]] = None, labels: Optional[List[str]] = None,
repo: Optional[str] = None repo: Optional[str] = None
) -> Dict: ) -> Dict:
""" """Create a new issue in Gitea."""
Create a new issue in Gitea. owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues"
Args: data = {'title': title, 'body': body}
title: Issue title
body: Issue description
labels: List of label names (will be converted to IDs)
repo: Override configured repo (for PMO multi-repo)
Returns:
Created issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues"
data = {
'title': title,
'body': body
}
if labels: if labels:
# Convert label names to IDs (Gitea expects integer IDs, not strings) label_ids = self._resolve_label_ids(labels, owner, target_repo)
label_ids = self._resolve_label_ids(labels, target_repo)
data['labels'] = label_ids data['labels'] = label_ids
logger.info(f"Creating issue in {owner}/{target_repo}: {title}")
logger.info(f"Creating issue in {self.owner}/{target_repo}: {title}")
response = self.session.post(url, json=data) response = self.session.post(url, json=data)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def _resolve_label_ids(self, label_names: List[str], repo: str) -> List[int]: def _resolve_label_ids(self, label_names: List[str], owner: str, repo: str) -> List[int]:
""" """Convert label names to label IDs."""
Convert label names to label IDs. org_labels = self.get_org_labels(owner)
repo_labels = self.get_labels(f"{owner}/{repo}")
Args:
label_names: List of label names (e.g., ['Type/Feature', 'Priority/High'])
repo: Repository name
Returns:
List of label IDs
"""
# Fetch all available labels (org + repo)
org_labels = self.get_org_labels()
repo_labels = self.get_labels(repo)
all_labels = org_labels + repo_labels all_labels = org_labels + repo_labels
# Build name -> ID mapping
label_map = {label['name']: label['id'] for label in all_labels} label_map = {label['name']: label['id'] for label in all_labels}
# Resolve IDs
label_ids = [] label_ids = []
for name in label_names: for name in label_names:
if name in label_map: if name in label_map:
label_ids.append(label_map[name]) label_ids.append(label_map[name])
else: else:
logger.warning(f"Label '{name}' not found in Gitea, skipping") logger.warning(f"Label '{name}' not found, skipping")
return label_ids return label_ids
def update_issue( def update_issue(
@@ -184,31 +127,10 @@ class GiteaClient:
labels: Optional[List[str]] = None, labels: Optional[List[str]] = None,
repo: Optional[str] = None repo: Optional[str] = None
) -> Dict: ) -> Dict:
""" """Update existing issue. Repo must be 'owner/repo' format."""
Update existing issue. owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}"
Args:
issue_number: Issue number
title: New title (optional)
body: New body (optional)
state: New state - 'open' or 'closed' (optional)
labels: New labels (optional)
repo: Override configured repo (for PMO multi-repo)
Returns:
Updated issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}"
data = {} data = {}
if title is not None: if title is not None:
data['title'] = title data['title'] = title
if body is not None: if body is not None:
@@ -217,8 +139,7 @@ class GiteaClient:
data['state'] = state data['state'] = state
if labels is not None: if labels is not None:
data['labels'] = labels data['labels'] = labels
logger.info(f"Updating issue #{issue_number} in {owner}/{target_repo}")
logger.info(f"Updating issue #{issue_number} in {self.owner}/{target_repo}")
response = self.session.patch(url, json=data) response = self.session.patch(url, json=data)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
@@ -229,131 +150,62 @@ class GiteaClient:
comment: str, comment: str,
repo: Optional[str] = None repo: Optional[str] = None
) -> Dict: ) -> Dict:
""" """Add comment to issue. Repo must be 'owner/repo' format."""
Add comment to issue. owner, target_repo = self._parse_repo(repo)
url = f"{self.base_url}/repos/{owner}/{target_repo}/issues/{issue_number}/comments"
Args:
issue_number: Issue number
comment: Comment text
repo: Override configured repo (for PMO multi-repo)
Returns:
Created comment dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}/comments"
data = {'body': comment} data = {'body': comment}
logger.info(f"Adding comment to issue #{issue_number} in {owner}/{target_repo}")
logger.info(f"Adding comment to issue #{issue_number} in {self.owner}/{target_repo}")
response = self.session.post(url, json=data) response = self.session.post(url, json=data)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def get_labels( def get_labels(self, repo: Optional[str] = None) -> List[Dict]:
self, """Get all labels from repository. Repo must be 'owner/repo' format."""
repo: Optional[str] = None owner, target_repo = self._parse_repo(repo)
) -> List[Dict]: url = f"{self.base_url}/repos/{owner}/{target_repo}/labels"
""" logger.info(f"Getting labels from {owner}/{target_repo}")
Get all labels from repository.
Args:
repo: Override configured repo (for PMO multi-repo)
Returns:
List of label dictionaries
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/labels"
logger.info(f"Getting labels from {self.owner}/{target_repo}")
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def get_org_labels(self) -> List[Dict]: def get_org_labels(self, org: str) -> List[Dict]:
""" """Get organization-level labels. Org is the organization name."""
Get organization-level labels. url = f"{self.base_url}/orgs/{org}/labels"
logger.info(f"Getting organization labels for {org}")
Returns:
List of organization label dictionaries
Raises:
requests.HTTPError: If API request fails
"""
url = f"{self.base_url}/orgs/{self.owner}/labels"
logger.info(f"Getting organization labels for {self.owner}")
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
# PMO-specific methods def list_repos(self, org: str) -> List[Dict]:
"""List all repositories in organization. Org is the organization name."""
def list_repos(self) -> List[Dict]: url = f"{self.base_url}/orgs/{org}/repos"
""" logger.info(f"Listing all repositories for organization {org}")
List all repositories in organization (PMO mode).
Returns:
List of repository dictionaries
Raises:
requests.HTTPError: If API request fails
"""
url = f"{self.base_url}/orgs/{self.owner}/repos"
logger.info(f"Listing all repositories for organization {self.owner}")
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def aggregate_issues( def aggregate_issues(
self, self,
org: str,
state: str = 'open', state: str = 'open',
labels: Optional[List[str]] = None labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]: ) -> Dict[str, List[Dict]]:
""" """Fetch issues across all repositories in org."""
Fetch issues across all repositories (PMO mode). repos = self.list_repos(org)
Returns dict keyed by repository name.
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
Returns:
Dictionary mapping repository names to issue lists
Raises:
requests.HTTPError: If API request fails
"""
repos = self.list_repos()
aggregated = {} aggregated = {}
logger.info(f"Aggregating issues across {len(repos)} repositories") logger.info(f"Aggregating issues across {len(repos)} repositories")
for repo in repos: for repo in repos:
repo_name = repo['name'] repo_name = repo['name']
try: try:
issues = self.list_issues( issues = self.list_issues(
state=state, state=state,
labels=labels, labels=labels,
repo=repo_name repo=f"{org}/{repo_name}"
) )
if issues: if issues:
aggregated[repo_name] = issues aggregated[repo_name] = issues
logger.info(f"Found {len(issues)} issues in {repo_name}") logger.info(f"Found {len(issues)} issues in {repo_name}")
except Exception as e: except Exception as e:
# Log error but continue with other repos
logger.error(f"Error fetching issues from {repo_name}: {e}") logger.error(f"Error fetching issues from {repo_name}: {e}")
return aggregated return aggregated

View File

@@ -15,7 +15,10 @@ from .gitea_client import GiteaClient
from .tools.issues import IssueTools from .tools.issues import IssueTools
from .tools.labels import LabelTools from .tools.labels import LabelTools
# Suppress noisy MCP validation warnings on stderr
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.getLogger("root").setLevel(logging.ERROR)
logging.getLogger("mcp").setLevel(logging.ERROR)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -216,6 +219,10 @@ class GiteaMCPServer:
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"org": {
"type": "string",
"description": "Organization name (e.g. 'bandit')"
},
"state": { "state": {
"type": "string", "type": "string",
"enum": ["open", "closed", "all"], "enum": ["open", "closed", "all"],
@@ -227,7 +234,8 @@ class GiteaMCPServer:
"items": {"type": "string"}, "items": {"type": "string"},
"description": "Filter by labels" "description": "Filter by labels"
} }
} },
"required": ["org"]
} }
) )
] ]

View File

@@ -245,35 +245,17 @@ class IssueTools:
async def aggregate_issues( async def aggregate_issues(
self, self,
org: str,
state: str = 'open', state: str = 'open',
labels: Optional[List[str]] = None labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]: ) -> Dict[str, List[Dict]]:
""" """Aggregate issues across all repositories in org."""
Aggregate issues across all repositories (PMO mode, async wrapper).
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
Returns:
Dictionary mapping repository names to issue lists
Raises:
ValueError: If not in company mode
PermissionError: If operation not allowed on current branch
"""
if self.gitea.mode != 'company':
raise ValueError("aggregate_issues only available in company mode")
if not self._check_branch_permissions('aggregate_issues'): if not self._check_branch_permissions('aggregate_issues'):
branch = self._get_current_branch() branch = self._get_current_branch()
raise PermissionError( raise PermissionError(f"Cannot aggregate issues on branch '{branch}'.")
f"Cannot aggregate issues on branch '{branch}'. "
f"Switch to a development branch."
)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
return await loop.run_in_executor( return await loop.run_in_executor(
None, None,
lambda: self.gitea.aggregate_issues(state, labels) lambda: self.gitea.aggregate_issues(org, state, labels)
) )

View File

@@ -27,31 +27,24 @@ class LabelTools:
self.gitea = gitea_client self.gitea = gitea_client
async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]: async def get_labels(self, repo: Optional[str] = None) -> Dict[str, List[Dict]]:
""" """Get all labels (org + repo). Repo must be 'owner/repo' format."""
Get all labels (org + repo) (async wrapper).
Args:
repo: Override configured repo (for PMO multi-repo)
Returns:
Dictionary with 'org' and 'repo' label lists
"""
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# Get org labels target_repo = repo or self.gitea.repo
if not target_repo or '/' not in target_repo:
raise ValueError("Use 'owner/repo' format (e.g. 'bandit/support-claude-mktplace')")
org = target_repo.split('/')[0]
org_labels = await loop.run_in_executor( org_labels = await loop.run_in_executor(
None, None,
self.gitea.get_org_labels lambda: self.gitea.get_org_labels(org)
) )
# Get repo labels if repo is specified repo_labels = await loop.run_in_executor(
repo_labels = [] None,
if repo or self.gitea.repo: lambda: self.gitea.get_labels(target_repo)
target_repo = repo or self.gitea.repo )
repo_labels = await loop.run_in_executor(
None,
lambda: self.gitea.get_labels(target_repo)
)
return { return {
'organization': org_labels, 'organization': org_labels,

View File

@@ -23,7 +23,10 @@ from .tools.vpn import VPNTools
from .tools.wireless import WirelessTools from .tools.wireless import WirelessTools
from .tools.extras import ExtrasTools from .tools.extras import ExtrasTools
# Suppress noisy MCP validation warnings on stderr
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.getLogger("root").setLevel(logging.ERROR)
logging.getLogger("mcp").setLevel(logging.ERROR)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -13,7 +13,10 @@ from mcp.types import Tool, TextContent
from .config import WikiJSConfig from .config import WikiJSConfig
from .wikijs_client import WikiJSClient from .wikijs_client import WikiJSClient
# Suppress noisy MCP validation warnings on stderr
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.getLogger("root").setLevel(logging.ERROR)
logging.getLogger("mcp").setLevel(logging.ERROR)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -2,7 +2,10 @@
"name": "cmdb-assistant", "name": "cmdb-assistant",
"version": "1.0.0", "version": "1.0.0",
"description": "NetBox CMDB integration for infrastructure management - query, create, update, and manage network devices, IP addresses, sites, and more", "description": "NetBox CMDB integration for infrastructure management - query, create, update, and manage network devices, IP addresses, sites, and more",
"author": "Bandit Labs", "author": {
"name": "Bandit Labs",
"email": "dev@banditlabs.io"
},
"homepage": "https://github.com/bandit-labs/cmdb-assistant", "homepage": "https://github.com/bandit-labs/cmdb-assistant",
"license": "MIT", "license": "MIT",
"keywords": [ "keywords": [
@@ -16,44 +19,25 @@
"commands": { "commands": {
"cmdb-search": { "cmdb-search": {
"description": "Search NetBox for devices, IPs, sites, or any CMDB object", "description": "Search NetBox for devices, IPs, sites, or any CMDB object",
"file": "commands/cmdb-search.md" "source": "commands/cmdb-search.md"
}, },
"cmdb-device": { "cmdb-device": {
"description": "Manage network devices (create, view, update, delete)", "description": "Manage network devices (create, view, update, delete)",
"file": "commands/cmdb-device.md" "source": "commands/cmdb-device.md"
}, },
"cmdb-ip": { "cmdb-ip": {
"description": "Manage IP addresses and prefixes", "description": "Manage IP addresses and prefixes",
"file": "commands/cmdb-ip.md" "source": "commands/cmdb-ip.md"
}, },
"cmdb-site": { "cmdb-site": {
"description": "Manage sites and locations", "description": "Manage sites and locations",
"file": "commands/cmdb-site.md" "source": "commands/cmdb-site.md"
} }
}, },
"agents": { "agents": {
"cmdb-assistant": { "cmdb-assistant": {
"description": "Infrastructure management assistant for NetBox CMDB operations", "description": "Infrastructure management assistant for NetBox CMDB operations",
"file": "agents/cmdb-assistant.md" "source": "agents/cmdb-assistant.md"
} }
},
"configuration": {
"required": [
{
"name": "NETBOX_URL",
"description": "NetBox instance URL (e.g., https://netbox.example.com)"
},
{
"name": "NETBOX_TOKEN",
"description": "NetBox API token for authentication"
}
],
"optional": [
{
"name": "NETBOX_VERIFY_SSL",
"description": "Verify SSL certificates (default: true)",
"default": "true"
}
]
} }
} }