Add report generation and issue listing tools: - generate_compatibility_report: Full marketplace validation with markdown or JSON output, includes summary statistics - list_issues: Filtered issue listing by severity and type The report tools coordinate parse_tools and validation_tools to scan all plugins in a marketplace, run pairwise compatibility checks, and aggregate findings into comprehensive reports. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""
|
|
Report tools for generating compatibility reports and listing issues.
|
|
|
|
Provides:
|
|
- generate_compatibility_report: Full marketplace validation report
|
|
- list_issues: Filtered issue listing
|
|
"""
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from pydantic import BaseModel
|
|
|
|
from .parse_tools import ParseTools
|
|
from .validation_tools import ValidationTools, IssueSeverity, IssueType, ValidationIssue
|
|
|
|
|
|
class ReportSummary(BaseModel):
|
|
"""Summary statistics for a report"""
|
|
total_plugins: int = 0
|
|
total_commands: int = 0
|
|
total_agents: int = 0
|
|
total_tools: int = 0
|
|
total_issues: int = 0
|
|
errors: int = 0
|
|
warnings: int = 0
|
|
info: int = 0
|
|
|
|
|
|
class ReportTools:
|
|
"""Tools for generating reports and listing issues"""
|
|
|
|
def __init__(self):
|
|
self.parse_tools = ParseTools()
|
|
self.validation_tools = ValidationTools()
|
|
|
|
async def generate_compatibility_report(
|
|
self,
|
|
marketplace_path: str,
|
|
format: str = "markdown"
|
|
) -> dict:
|
|
"""
|
|
Generate a comprehensive compatibility report for all plugins.
|
|
|
|
Args:
|
|
marketplace_path: Path to marketplace root directory
|
|
format: Output format ("markdown" or "json")
|
|
|
|
Returns:
|
|
Full compatibility report with all findings
|
|
"""
|
|
marketplace = Path(marketplace_path)
|
|
plugins_dir = marketplace / "plugins"
|
|
|
|
if not plugins_dir.exists():
|
|
return {
|
|
"error": f"Plugins directory not found at {plugins_dir}",
|
|
"marketplace_path": marketplace_path
|
|
}
|
|
|
|
# Discover all plugins
|
|
plugins = []
|
|
for item in plugins_dir.iterdir():
|
|
if item.is_dir() and (item / ".claude-plugin").exists():
|
|
plugins.append(item)
|
|
|
|
if not plugins:
|
|
return {
|
|
"error": "No plugins found in marketplace",
|
|
"marketplace_path": marketplace_path
|
|
}
|
|
|
|
# Parse all plugin interfaces
|
|
interfaces = {}
|
|
all_issues = []
|
|
summary = ReportSummary(total_plugins=len(plugins))
|
|
|
|
for plugin_path in plugins:
|
|
interface = await self.parse_tools.parse_plugin_interface(str(plugin_path))
|
|
if "error" not in interface:
|
|
interfaces[interface["plugin_name"]] = interface
|
|
summary.total_commands += len(interface.get("commands", []))
|
|
summary.total_agents += len(interface.get("agents", []))
|
|
summary.total_tools += len(interface.get("tools", []))
|
|
|
|
# Run pairwise compatibility checks
|
|
plugin_names = list(interfaces.keys())
|
|
compatibility_results = []
|
|
|
|
for i, name_a in enumerate(plugin_names):
|
|
for name_b in plugin_names[i+1:]:
|
|
path_a = plugins_dir / self._find_plugin_dir(plugins_dir, name_a)
|
|
path_b = plugins_dir / self._find_plugin_dir(plugins_dir, name_b)
|
|
|
|
result = await self.validation_tools.validate_compatibility(
|
|
str(path_a), str(path_b)
|
|
)
|
|
|
|
if "error" not in result:
|
|
compatibility_results.append(result)
|
|
all_issues.extend(result.get("issues", []))
|
|
|
|
# Parse CLAUDE.md if exists
|
|
claude_md = marketplace / "CLAUDE.md"
|
|
agents_from_claude = []
|
|
if claude_md.exists():
|
|
agents_result = await self.parse_tools.parse_claude_md_agents(str(claude_md))
|
|
if "error" not in agents_result:
|
|
agents_from_claude = agents_result.get("agents", [])
|
|
|
|
# Validate each agent
|
|
for agent in agents_from_claude:
|
|
agent_result = await self.validation_tools.validate_agent_refs(
|
|
agent["name"],
|
|
str(claude_md),
|
|
[str(p) for p in plugins]
|
|
)
|
|
if "error" not in agent_result:
|
|
all_issues.extend(agent_result.get("issues", []))
|
|
|
|
# Count issues by severity
|
|
for issue in all_issues:
|
|
severity = issue.get("severity", "info")
|
|
if isinstance(severity, str):
|
|
severity_str = severity.lower()
|
|
else:
|
|
severity_str = severity.value if hasattr(severity, 'value') else str(severity).lower()
|
|
|
|
if "error" in severity_str:
|
|
summary.errors += 1
|
|
elif "warning" in severity_str:
|
|
summary.warnings += 1
|
|
else:
|
|
summary.info += 1
|
|
|
|
summary.total_issues = len(all_issues)
|
|
|
|
# Generate report
|
|
if format == "json":
|
|
return {
|
|
"generated_at": datetime.now().isoformat(),
|
|
"marketplace_path": marketplace_path,
|
|
"summary": summary.model_dump(),
|
|
"plugins": interfaces,
|
|
"compatibility_checks": compatibility_results,
|
|
"claude_md_agents": agents_from_claude,
|
|
"all_issues": all_issues
|
|
}
|
|
else:
|
|
# Generate markdown report
|
|
report = self._generate_markdown_report(
|
|
marketplace_path,
|
|
summary,
|
|
interfaces,
|
|
compatibility_results,
|
|
agents_from_claude,
|
|
all_issues
|
|
)
|
|
return {
|
|
"generated_at": datetime.now().isoformat(),
|
|
"marketplace_path": marketplace_path,
|
|
"summary": summary.model_dump(),
|
|
"report": report
|
|
}
|
|
|
|
def _find_plugin_dir(self, plugins_dir: Path, plugin_name: str) -> str:
|
|
"""Find plugin directory by name (handles naming variations)"""
|
|
# Try exact match first
|
|
for item in plugins_dir.iterdir():
|
|
if item.is_dir():
|
|
if item.name.lower() == plugin_name.lower():
|
|
return item.name
|
|
# Check plugin.json for name
|
|
plugin_json = item / ".claude-plugin" / "plugin.json"
|
|
if plugin_json.exists():
|
|
import json
|
|
try:
|
|
data = json.loads(plugin_json.read_text())
|
|
if data.get("name", "").lower() == plugin_name.lower():
|
|
return item.name
|
|
except:
|
|
pass
|
|
return plugin_name
|
|
|
|
def _generate_markdown_report(
|
|
self,
|
|
marketplace_path: str,
|
|
summary: ReportSummary,
|
|
interfaces: dict,
|
|
compatibility_results: list,
|
|
agents: list,
|
|
issues: list
|
|
) -> str:
|
|
"""Generate markdown formatted report"""
|
|
lines = [
|
|
"# Contract Validation Report",
|
|
"",
|
|
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"**Marketplace:** `{marketplace_path}`",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
f"| Metric | Count |",
|
|
f"|--------|-------|",
|
|
f"| Plugins | {summary.total_plugins} |",
|
|
f"| Commands | {summary.total_commands} |",
|
|
f"| Agents | {summary.total_agents} |",
|
|
f"| Tools | {summary.total_tools} |",
|
|
f"| **Issues** | **{summary.total_issues}** |",
|
|
f"| - Errors | {summary.errors} |",
|
|
f"| - Warnings | {summary.warnings} |",
|
|
f"| - Info | {summary.info} |",
|
|
"",
|
|
]
|
|
|
|
# Plugin details
|
|
lines.extend([
|
|
"## Plugins",
|
|
"",
|
|
])
|
|
|
|
for name, interface in interfaces.items():
|
|
cmds = len(interface.get("commands", []))
|
|
agents_count = len(interface.get("agents", []))
|
|
tools = len(interface.get("tools", []))
|
|
lines.append(f"### {name}")
|
|
lines.append("")
|
|
lines.append(f"- Commands: {cmds}")
|
|
lines.append(f"- Agents: {agents_count}")
|
|
lines.append(f"- Tools: {tools}")
|
|
lines.append("")
|
|
|
|
# Compatibility results
|
|
if compatibility_results:
|
|
lines.extend([
|
|
"## Compatibility Checks",
|
|
"",
|
|
])
|
|
|
|
for result in compatibility_results:
|
|
status = "✓" if result.get("compatible", True) else "✗"
|
|
lines.append(f"### {result['plugin_a']} ↔ {result['plugin_b']} {status}")
|
|
lines.append("")
|
|
|
|
if result.get("shared_tools"):
|
|
lines.append(f"- Shared tools: `{', '.join(result['shared_tools'])}`")
|
|
if result.get("issues"):
|
|
for issue in result["issues"]:
|
|
sev = issue.get("severity", "info")
|
|
if hasattr(sev, 'value'):
|
|
sev = sev.value
|
|
lines.append(f"- [{sev.upper()}] {issue['message']}")
|
|
lines.append("")
|
|
|
|
# Issues section
|
|
if issues:
|
|
lines.extend([
|
|
"## All Issues",
|
|
"",
|
|
"| Severity | Type | Message |",
|
|
"|----------|------|---------|",
|
|
])
|
|
|
|
for issue in issues:
|
|
sev = issue.get("severity", "info")
|
|
itype = issue.get("issue_type", "unknown")
|
|
msg = issue.get("message", "")
|
|
|
|
if hasattr(sev, 'value'):
|
|
sev = sev.value
|
|
if hasattr(itype, 'value'):
|
|
itype = itype.value
|
|
|
|
# Truncate message for table
|
|
msg_short = msg[:60] + "..." if len(msg) > 60 else msg
|
|
lines.append(f"| {sev} | {itype} | {msg_short} |")
|
|
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def list_issues(
|
|
self,
|
|
marketplace_path: str,
|
|
severity: str = "all",
|
|
issue_type: str = "all"
|
|
) -> dict:
|
|
"""
|
|
List validation issues with optional filtering.
|
|
|
|
Args:
|
|
marketplace_path: Path to marketplace root directory
|
|
severity: Filter by severity ("error", "warning", "info", "all")
|
|
issue_type: Filter by type ("missing_tool", "interface_mismatch", etc., "all")
|
|
|
|
Returns:
|
|
Filtered list of issues
|
|
"""
|
|
# Generate full report first
|
|
report = await self.generate_compatibility_report(marketplace_path, format="json")
|
|
|
|
if "error" in report:
|
|
return report
|
|
|
|
all_issues = report.get("all_issues", [])
|
|
|
|
# Filter by severity
|
|
if severity != "all":
|
|
filtered = []
|
|
for issue in all_issues:
|
|
issue_sev = issue.get("severity", "info")
|
|
if hasattr(issue_sev, 'value'):
|
|
issue_sev = issue_sev.value
|
|
if isinstance(issue_sev, str) and severity.lower() in issue_sev.lower():
|
|
filtered.append(issue)
|
|
all_issues = filtered
|
|
|
|
# Filter by type
|
|
if issue_type != "all":
|
|
filtered = []
|
|
for issue in all_issues:
|
|
itype = issue.get("issue_type", "unknown")
|
|
if hasattr(itype, 'value'):
|
|
itype = itype.value
|
|
if isinstance(itype, str) and issue_type.lower() in itype.lower():
|
|
filtered.append(issue)
|
|
all_issues = filtered
|
|
|
|
return {
|
|
"marketplace_path": marketplace_path,
|
|
"filters": {
|
|
"severity": severity,
|
|
"issue_type": issue_type
|
|
},
|
|
"total_issues": len(all_issues),
|
|
"issues": all_issues
|
|
}
|