Merge feat/188-report-tools: implement report tools
This commit is contained in:
3
mcp-servers/contract-validator/mcp_server/__init__.py
Normal file
3
mcp-servers/contract-validator/mcp_server/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Contract Validator MCP Server - Cross-plugin compatibility validation."""
|
||||
|
||||
__version__ = "1.0.0"
|
||||
415
mcp-servers/contract-validator/mcp_server/parse_tools.py
Normal file
415
mcp-servers/contract-validator/mcp_server/parse_tools.py
Normal file
@@ -0,0 +1,415 @@
|
||||
"""
|
||||
Parse tools for extracting interfaces from plugin documentation.
|
||||
|
||||
Provides structured extraction of:
|
||||
- Plugin interfaces from README.md (commands, agents, tools)
|
||||
- Agent definitions from CLAUDE.md (tool sequences, workflows)
|
||||
"""
|
||||
import re
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ToolInfo(BaseModel):
|
||||
"""Information about a single tool"""
|
||||
name: str
|
||||
category: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class CommandInfo(BaseModel):
|
||||
"""Information about a plugin command"""
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class AgentInfo(BaseModel):
|
||||
"""Information about a plugin agent"""
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
tools: list[str] = []
|
||||
|
||||
|
||||
class PluginInterface(BaseModel):
|
||||
"""Structured plugin interface extracted from README"""
|
||||
plugin_name: str
|
||||
description: Optional[str] = None
|
||||
commands: list[CommandInfo] = []
|
||||
agents: list[AgentInfo] = []
|
||||
tools: list[ToolInfo] = []
|
||||
tool_categories: dict[str, list[str]] = {}
|
||||
features: list[str] = []
|
||||
|
||||
|
||||
class ClaudeMdAgent(BaseModel):
|
||||
"""Agent definition extracted from CLAUDE.md"""
|
||||
name: str
|
||||
personality: Optional[str] = None
|
||||
responsibilities: list[str] = []
|
||||
tool_refs: list[str] = []
|
||||
workflow_steps: list[str] = []
|
||||
|
||||
|
||||
class ParseTools:
|
||||
"""Tools for parsing plugin documentation"""
|
||||
|
||||
async def parse_plugin_interface(self, plugin_path: str) -> dict:
|
||||
"""
|
||||
Parse plugin README.md to extract interface declarations.
|
||||
|
||||
Args:
|
||||
plugin_path: Path to plugin directory or README.md file
|
||||
|
||||
Returns:
|
||||
Structured interface with commands, agents, tools, etc.
|
||||
"""
|
||||
# Resolve path to README
|
||||
path = Path(plugin_path)
|
||||
if path.is_dir():
|
||||
readme_path = path / "README.md"
|
||||
else:
|
||||
readme_path = path
|
||||
|
||||
if not readme_path.exists():
|
||||
return {
|
||||
"error": f"README.md not found at {readme_path}",
|
||||
"plugin_path": plugin_path
|
||||
}
|
||||
|
||||
content = readme_path.read_text()
|
||||
plugin_name = self._extract_plugin_name(content, path)
|
||||
|
||||
interface = PluginInterface(
|
||||
plugin_name=plugin_name,
|
||||
description=self._extract_description(content),
|
||||
commands=self._extract_commands(content),
|
||||
agents=self._extract_agents_from_readme(content),
|
||||
tools=self._extract_tools(content),
|
||||
tool_categories=self._extract_tool_categories(content),
|
||||
features=self._extract_features(content)
|
||||
)
|
||||
|
||||
return interface.model_dump()
|
||||
|
||||
async def parse_claude_md_agents(self, claude_md_path: str) -> dict:
|
||||
"""
|
||||
Parse CLAUDE.md to extract agent definitions and tool sequences.
|
||||
|
||||
Args:
|
||||
claude_md_path: Path to CLAUDE.md file
|
||||
|
||||
Returns:
|
||||
List of agents with their tool sequences
|
||||
"""
|
||||
path = Path(claude_md_path)
|
||||
|
||||
if not path.exists():
|
||||
return {
|
||||
"error": f"CLAUDE.md not found at {path}",
|
||||
"claude_md_path": claude_md_path
|
||||
}
|
||||
|
||||
content = path.read_text()
|
||||
agents = self._extract_agents_from_claude_md(content)
|
||||
|
||||
return {
|
||||
"file": str(path),
|
||||
"agents": [a.model_dump() for a in agents],
|
||||
"agent_count": len(agents)
|
||||
}
|
||||
|
||||
def _extract_plugin_name(self, content: str, path: Path) -> str:
|
||||
"""Extract plugin name from content or path"""
|
||||
# Try to get from H1 header
|
||||
match = re.search(r'^#\s+(.+?)(?:\s+Plugin|\s*$)', content, re.MULTILINE)
|
||||
if match:
|
||||
name = match.group(1).strip()
|
||||
# Handle cases like "# data-platform Plugin"
|
||||
name = re.sub(r'\s*Plugin\s*$', '', name, flags=re.IGNORECASE)
|
||||
return name
|
||||
|
||||
# Fall back to directory name
|
||||
if path.is_dir():
|
||||
return path.name
|
||||
return path.parent.name
|
||||
|
||||
def _extract_description(self, content: str) -> Optional[str]:
|
||||
"""Extract plugin description from first paragraph after title"""
|
||||
# Get content after H1, before first H2
|
||||
match = re.search(r'^#\s+.+?\n\n(.+?)(?=\n##|\n\n##|\Z)', content, re.MULTILINE | re.DOTALL)
|
||||
if match:
|
||||
desc = match.group(1).strip()
|
||||
# Take first paragraph only
|
||||
desc = desc.split('\n\n')[0].strip()
|
||||
return desc
|
||||
return None
|
||||
|
||||
def _extract_commands(self, content: str) -> list[CommandInfo]:
|
||||
"""Extract commands from Commands section"""
|
||||
commands = []
|
||||
|
||||
# Find Commands section
|
||||
commands_section = self._extract_section(content, "Commands")
|
||||
if not commands_section:
|
||||
return commands
|
||||
|
||||
# Parse table format: | Command | Description |
|
||||
# Only match actual command names (start with / or alphanumeric)
|
||||
table_pattern = r'\|\s*`?(/[a-z][-a-z0-9]*)`?\s*\|\s*([^|]+)\s*\|'
|
||||
for match in re.finditer(table_pattern, commands_section):
|
||||
cmd_name = match.group(1).strip()
|
||||
desc = match.group(2).strip()
|
||||
|
||||
# Skip header row and separators
|
||||
if cmd_name.lower() in ('command', 'commands') or cmd_name.startswith('-'):
|
||||
continue
|
||||
|
||||
commands.append(CommandInfo(
|
||||
name=cmd_name,
|
||||
description=desc
|
||||
))
|
||||
|
||||
# Also look for ### `/command-name` format (with backticks)
|
||||
cmd_header_pattern = r'^###\s+`(/[a-z][-a-z0-9]*)`\s*\n(.+?)(?=\n###|\n##|\Z)'
|
||||
for match in re.finditer(cmd_header_pattern, commands_section, re.MULTILINE | re.DOTALL):
|
||||
cmd_name = match.group(1).strip()
|
||||
desc_block = match.group(2).strip()
|
||||
# Get first line or paragraph as description
|
||||
desc = desc_block.split('\n')[0].strip()
|
||||
|
||||
# Don't duplicate if already found in table
|
||||
if not any(c.name == cmd_name for c in commands):
|
||||
commands.append(CommandInfo(name=cmd_name, description=desc))
|
||||
|
||||
# Also look for ### /command-name format (without backticks)
|
||||
cmd_header_pattern2 = r'^###\s+(/[a-z][-a-z0-9]*)\s*\n(.+?)(?=\n###|\n##|\Z)'
|
||||
for match in re.finditer(cmd_header_pattern2, commands_section, re.MULTILINE | re.DOTALL):
|
||||
cmd_name = match.group(1).strip()
|
||||
desc_block = match.group(2).strip()
|
||||
# Get first line or paragraph as description
|
||||
desc = desc_block.split('\n')[0].strip()
|
||||
|
||||
# Don't duplicate if already found in table
|
||||
if not any(c.name == cmd_name for c in commands):
|
||||
commands.append(CommandInfo(name=cmd_name, description=desc))
|
||||
|
||||
return commands
|
||||
|
||||
def _extract_agents_from_readme(self, content: str) -> list[AgentInfo]:
|
||||
"""Extract agents from Agents section in README"""
|
||||
agents = []
|
||||
|
||||
# Find Agents section
|
||||
agents_section = self._extract_section(content, "Agents")
|
||||
if not agents_section:
|
||||
return agents
|
||||
|
||||
# Parse table format: | Agent | Description |
|
||||
# Only match actual agent names (alphanumeric with dashes/underscores)
|
||||
table_pattern = r'\|\s*`?([a-z][-a-z0-9_]*)`?\s*\|\s*([^|]+)\s*\|'
|
||||
for match in re.finditer(table_pattern, agents_section):
|
||||
agent_name = match.group(1).strip()
|
||||
desc = match.group(2).strip()
|
||||
|
||||
# Skip header row and separators
|
||||
if agent_name.lower() in ('agent', 'agents') or agent_name.startswith('-'):
|
||||
continue
|
||||
|
||||
agents.append(AgentInfo(name=agent_name, description=desc))
|
||||
|
||||
return agents
|
||||
|
||||
def _extract_tools(self, content: str) -> list[ToolInfo]:
|
||||
"""Extract tool list from Tools Summary or similar section"""
|
||||
tools = []
|
||||
|
||||
# Find Tools Summary section
|
||||
tools_section = self._extract_section(content, "Tools Summary")
|
||||
if not tools_section:
|
||||
tools_section = self._extract_section(content, "Tools")
|
||||
if not tools_section:
|
||||
tools_section = self._extract_section(content, "MCP Server Tools")
|
||||
|
||||
if not tools_section:
|
||||
return tools
|
||||
|
||||
# Parse category headers: ### category (N tools)
|
||||
category_pattern = r'###\s*(.+?)\s*(?:\((\d+)\s*tools?\))?\s*\n([^#]+)'
|
||||
for match in re.finditer(category_pattern, tools_section):
|
||||
category = match.group(1).strip()
|
||||
tool_list_text = match.group(3).strip()
|
||||
|
||||
# Extract tool names from backtick lists
|
||||
tool_names = re.findall(r'`([a-z_]+)`', tool_list_text)
|
||||
for name in tool_names:
|
||||
tools.append(ToolInfo(name=name, category=category))
|
||||
|
||||
# Also look for inline tool lists without categories
|
||||
inline_pattern = r'`([a-z_]+)`'
|
||||
all_tool_names = set(t.name for t in tools)
|
||||
for match in re.finditer(inline_pattern, tools_section):
|
||||
name = match.group(1)
|
||||
if name not in all_tool_names:
|
||||
tools.append(ToolInfo(name=name))
|
||||
all_tool_names.add(name)
|
||||
|
||||
return tools
|
||||
|
||||
def _extract_tool_categories(self, content: str) -> dict[str, list[str]]:
|
||||
"""Extract tool categories with their tool lists"""
|
||||
categories = {}
|
||||
|
||||
tools_section = self._extract_section(content, "Tools Summary")
|
||||
if not tools_section:
|
||||
tools_section = self._extract_section(content, "Tools")
|
||||
if not tools_section:
|
||||
return categories
|
||||
|
||||
# Parse category headers: ### category (N tools)
|
||||
category_pattern = r'###\s*(.+?)\s*(?:\((\d+)\s*tools?\))?\s*\n([^#]+)'
|
||||
for match in re.finditer(category_pattern, tools_section):
|
||||
category = match.group(1).strip()
|
||||
tool_list_text = match.group(3).strip()
|
||||
|
||||
# Extract tool names from backtick lists
|
||||
tool_names = re.findall(r'`([a-z_]+)`', tool_list_text)
|
||||
if tool_names:
|
||||
categories[category] = tool_names
|
||||
|
||||
return categories
|
||||
|
||||
def _extract_features(self, content: str) -> list[str]:
|
||||
"""Extract features from Features section"""
|
||||
features = []
|
||||
|
||||
features_section = self._extract_section(content, "Features")
|
||||
if not features_section:
|
||||
return features
|
||||
|
||||
# Parse bullet points
|
||||
bullet_pattern = r'^[-*]\s+\*\*(.+?)\*\*'
|
||||
for match in re.finditer(bullet_pattern, features_section, re.MULTILINE):
|
||||
features.append(match.group(1).strip())
|
||||
|
||||
return features
|
||||
|
||||
def _extract_section(self, content: str, section_name: str) -> Optional[str]:
|
||||
"""Extract content of a markdown section by header name"""
|
||||
# Match ## Section Name - include all content until next ## (same level or higher)
|
||||
pattern = rf'^##\s+{re.escape(section_name)}(?:\s*\([^)]*\))?\s*\n(.*?)(?=\n##[^#]|\Z)'
|
||||
match = re.search(pattern, content, re.MULTILINE | re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
|
||||
# Try ### level - include content until next ## or ###
|
||||
pattern = rf'^###\s+{re.escape(section_name)}(?:\s*\([^)]*\))?\s*\n(.*?)(?=\n##|\n###[^#]|\Z)'
|
||||
match = re.search(pattern, content, re.MULTILINE | re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
|
||||
return None
|
||||
|
||||
def _extract_agents_from_claude_md(self, content: str) -> list[ClaudeMdAgent]:
|
||||
"""Extract agent definitions from CLAUDE.md"""
|
||||
agents = []
|
||||
|
||||
# Look for Four-Agent Model section specifically
|
||||
# Match section headers like "### Four-Agent Model (projman)" or "## Four-Agent Model"
|
||||
agent_model_match = re.search(
|
||||
r'^##[#]?\s+Four-Agent Model.*?\n(.*?)(?=\n##[^#]|\Z)',
|
||||
content, re.MULTILINE | re.DOTALL
|
||||
)
|
||||
agent_model_section = agent_model_match.group(1) if agent_model_match else None
|
||||
|
||||
if agent_model_section:
|
||||
# Parse agent table within this section
|
||||
# | **Planner** | Thoughtful, methodical | Sprint planning, ... |
|
||||
# Match rows where first cell starts with ** (bold) and contains a capitalized word
|
||||
agent_table_pattern = r'\|\s*\*\*([A-Z][a-zA-Z\s]+?)\*\*\s*\|\s*([^|]+)\s*\|\s*([^|]+)\s*\|'
|
||||
|
||||
for match in re.finditer(agent_table_pattern, agent_model_section):
|
||||
agent_name = match.group(1).strip()
|
||||
personality = match.group(2).strip()
|
||||
responsibilities = match.group(3).strip()
|
||||
|
||||
# Skip header rows and separator rows
|
||||
if agent_name.lower() in ('agent', 'agents', '---', '-', ''):
|
||||
continue
|
||||
if 'personality' in personality.lower() or '---' in personality:
|
||||
continue
|
||||
|
||||
# Skip if personality looks like tool names (contains backticks)
|
||||
if '`' in personality:
|
||||
continue
|
||||
|
||||
# Extract tool references from responsibilities
|
||||
tool_refs = re.findall(r'`([a-z_]+)`', responsibilities)
|
||||
|
||||
# Split responsibilities by comma
|
||||
resp_list = [r.strip() for r in responsibilities.split(',')]
|
||||
|
||||
agents.append(ClaudeMdAgent(
|
||||
name=agent_name,
|
||||
personality=personality,
|
||||
responsibilities=resp_list,
|
||||
tool_refs=tool_refs
|
||||
))
|
||||
|
||||
# Also look for agents table in ## Agents section
|
||||
agents_section = self._extract_section(content, "Agents")
|
||||
if agents_section:
|
||||
# Parse table: | Agent | Description |
|
||||
table_pattern = r'\|\s*`?([a-z][-a-z0-9_]+)`?\s*\|\s*([^|]+)\s*\|'
|
||||
for match in re.finditer(table_pattern, agents_section):
|
||||
agent_name = match.group(1).strip()
|
||||
desc = match.group(2).strip()
|
||||
|
||||
# Skip header rows
|
||||
if agent_name.lower() in ('agent', 'agents', '---', '-'):
|
||||
continue
|
||||
|
||||
# Check if agent already exists
|
||||
if not any(a.name.lower() == agent_name.lower() for a in agents):
|
||||
agents.append(ClaudeMdAgent(
|
||||
name=agent_name,
|
||||
responsibilities=[desc] if desc else []
|
||||
))
|
||||
|
||||
# Look for workflow sections to enrich agent data
|
||||
workflow_section = self._extract_section(content, "Workflow")
|
||||
if workflow_section:
|
||||
# Parse numbered steps
|
||||
step_pattern = r'^\d+\.\s+(.+?)$'
|
||||
workflow_steps = re.findall(step_pattern, workflow_section, re.MULTILINE)
|
||||
|
||||
# Associate workflow steps with agents mentioned
|
||||
for agent in agents:
|
||||
for step in workflow_steps:
|
||||
if agent.name.lower() in step.lower():
|
||||
agent.workflow_steps.append(step)
|
||||
# Extract any tool references in the step
|
||||
step_tools = re.findall(r'`([a-z_]+)`', step)
|
||||
agent.tool_refs.extend(t for t in step_tools if t not in agent.tool_refs)
|
||||
|
||||
# Look for agent-specific sections (### Planner Agent)
|
||||
agent_section_pattern = r'^###?\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\s+Agent\s*\n(.*?)(?=\n##|\n###|\Z)'
|
||||
for match in re.finditer(agent_section_pattern, content, re.MULTILINE | re.DOTALL):
|
||||
agent_name = match.group(1).strip()
|
||||
section_content = match.group(2).strip()
|
||||
|
||||
# Check if agent already exists
|
||||
existing = next((a for a in agents if a.name.lower() == agent_name.lower()), None)
|
||||
if existing:
|
||||
# Add tool refs from this section
|
||||
tool_refs = re.findall(r'`([a-z_]+)`', section_content)
|
||||
existing.tool_refs.extend(t for t in tool_refs if t not in existing.tool_refs)
|
||||
else:
|
||||
tool_refs = re.findall(r'`([a-z_]+)`', section_content)
|
||||
agents.append(ClaudeMdAgent(
|
||||
name=agent_name,
|
||||
tool_refs=tool_refs
|
||||
))
|
||||
|
||||
return agents
|
||||
337
mcp-servers/contract-validator/mcp_server/report_tools.py
Normal file
337
mcp-servers/contract-validator/mcp_server/report_tools.py
Normal file
@@ -0,0 +1,337 @@
|
||||
"""
|
||||
Report tools for generating compatibility reports and listing issues.
|
||||
|
||||
Provides:
|
||||
- generate_compatibility_report: Full marketplace validation report
|
||||
- list_issues: Filtered issue listing
|
||||
"""
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .parse_tools import ParseTools
|
||||
from .validation_tools import ValidationTools, IssueSeverity, IssueType, ValidationIssue
|
||||
|
||||
|
||||
class ReportSummary(BaseModel):
|
||||
"""Summary statistics for a report"""
|
||||
total_plugins: int = 0
|
||||
total_commands: int = 0
|
||||
total_agents: int = 0
|
||||
total_tools: int = 0
|
||||
total_issues: int = 0
|
||||
errors: int = 0
|
||||
warnings: int = 0
|
||||
info: int = 0
|
||||
|
||||
|
||||
class ReportTools:
|
||||
"""Tools for generating reports and listing issues"""
|
||||
|
||||
def __init__(self):
|
||||
self.parse_tools = ParseTools()
|
||||
self.validation_tools = ValidationTools()
|
||||
|
||||
async def generate_compatibility_report(
|
||||
self,
|
||||
marketplace_path: str,
|
||||
format: str = "markdown"
|
||||
) -> dict:
|
||||
"""
|
||||
Generate a comprehensive compatibility report for all plugins.
|
||||
|
||||
Args:
|
||||
marketplace_path: Path to marketplace root directory
|
||||
format: Output format ("markdown" or "json")
|
||||
|
||||
Returns:
|
||||
Full compatibility report with all findings
|
||||
"""
|
||||
marketplace = Path(marketplace_path)
|
||||
plugins_dir = marketplace / "plugins"
|
||||
|
||||
if not plugins_dir.exists():
|
||||
return {
|
||||
"error": f"Plugins directory not found at {plugins_dir}",
|
||||
"marketplace_path": marketplace_path
|
||||
}
|
||||
|
||||
# Discover all plugins
|
||||
plugins = []
|
||||
for item in plugins_dir.iterdir():
|
||||
if item.is_dir() and (item / ".claude-plugin").exists():
|
||||
plugins.append(item)
|
||||
|
||||
if not plugins:
|
||||
return {
|
||||
"error": "No plugins found in marketplace",
|
||||
"marketplace_path": marketplace_path
|
||||
}
|
||||
|
||||
# Parse all plugin interfaces
|
||||
interfaces = {}
|
||||
all_issues = []
|
||||
summary = ReportSummary(total_plugins=len(plugins))
|
||||
|
||||
for plugin_path in plugins:
|
||||
interface = await self.parse_tools.parse_plugin_interface(str(plugin_path))
|
||||
if "error" not in interface:
|
||||
interfaces[interface["plugin_name"]] = interface
|
||||
summary.total_commands += len(interface.get("commands", []))
|
||||
summary.total_agents += len(interface.get("agents", []))
|
||||
summary.total_tools += len(interface.get("tools", []))
|
||||
|
||||
# Run pairwise compatibility checks
|
||||
plugin_names = list(interfaces.keys())
|
||||
compatibility_results = []
|
||||
|
||||
for i, name_a in enumerate(plugin_names):
|
||||
for name_b in plugin_names[i+1:]:
|
||||
path_a = plugins_dir / self._find_plugin_dir(plugins_dir, name_a)
|
||||
path_b = plugins_dir / self._find_plugin_dir(plugins_dir, name_b)
|
||||
|
||||
result = await self.validation_tools.validate_compatibility(
|
||||
str(path_a), str(path_b)
|
||||
)
|
||||
|
||||
if "error" not in result:
|
||||
compatibility_results.append(result)
|
||||
all_issues.extend(result.get("issues", []))
|
||||
|
||||
# Parse CLAUDE.md if exists
|
||||
claude_md = marketplace / "CLAUDE.md"
|
||||
agents_from_claude = []
|
||||
if claude_md.exists():
|
||||
agents_result = await self.parse_tools.parse_claude_md_agents(str(claude_md))
|
||||
if "error" not in agents_result:
|
||||
agents_from_claude = agents_result.get("agents", [])
|
||||
|
||||
# Validate each agent
|
||||
for agent in agents_from_claude:
|
||||
agent_result = await self.validation_tools.validate_agent_refs(
|
||||
agent["name"],
|
||||
str(claude_md),
|
||||
[str(p) for p in plugins]
|
||||
)
|
||||
if "error" not in agent_result:
|
||||
all_issues.extend(agent_result.get("issues", []))
|
||||
|
||||
# Count issues by severity
|
||||
for issue in all_issues:
|
||||
severity = issue.get("severity", "info")
|
||||
if isinstance(severity, str):
|
||||
severity_str = severity.lower()
|
||||
else:
|
||||
severity_str = severity.value if hasattr(severity, 'value') else str(severity).lower()
|
||||
|
||||
if "error" in severity_str:
|
||||
summary.errors += 1
|
||||
elif "warning" in severity_str:
|
||||
summary.warnings += 1
|
||||
else:
|
||||
summary.info += 1
|
||||
|
||||
summary.total_issues = len(all_issues)
|
||||
|
||||
# Generate report
|
||||
if format == "json":
|
||||
return {
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"marketplace_path": marketplace_path,
|
||||
"summary": summary.model_dump(),
|
||||
"plugins": interfaces,
|
||||
"compatibility_checks": compatibility_results,
|
||||
"claude_md_agents": agents_from_claude,
|
||||
"all_issues": all_issues
|
||||
}
|
||||
else:
|
||||
# Generate markdown report
|
||||
report = self._generate_markdown_report(
|
||||
marketplace_path,
|
||||
summary,
|
||||
interfaces,
|
||||
compatibility_results,
|
||||
agents_from_claude,
|
||||
all_issues
|
||||
)
|
||||
return {
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"marketplace_path": marketplace_path,
|
||||
"summary": summary.model_dump(),
|
||||
"report": report
|
||||
}
|
||||
|
||||
def _find_plugin_dir(self, plugins_dir: Path, plugin_name: str) -> str:
|
||||
"""Find plugin directory by name (handles naming variations)"""
|
||||
# Try exact match first
|
||||
for item in plugins_dir.iterdir():
|
||||
if item.is_dir():
|
||||
if item.name.lower() == plugin_name.lower():
|
||||
return item.name
|
||||
# Check plugin.json for name
|
||||
plugin_json = item / ".claude-plugin" / "plugin.json"
|
||||
if plugin_json.exists():
|
||||
import json
|
||||
try:
|
||||
data = json.loads(plugin_json.read_text())
|
||||
if data.get("name", "").lower() == plugin_name.lower():
|
||||
return item.name
|
||||
except:
|
||||
pass
|
||||
return plugin_name
|
||||
|
||||
def _generate_markdown_report(
|
||||
self,
|
||||
marketplace_path: str,
|
||||
summary: ReportSummary,
|
||||
interfaces: dict,
|
||||
compatibility_results: list,
|
||||
agents: list,
|
||||
issues: list
|
||||
) -> str:
|
||||
"""Generate markdown formatted report"""
|
||||
lines = [
|
||||
"# Contract Validation Report",
|
||||
"",
|
||||
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
||||
f"**Marketplace:** `{marketplace_path}`",
|
||||
"",
|
||||
"## Summary",
|
||||
"",
|
||||
f"| Metric | Count |",
|
||||
f"|--------|-------|",
|
||||
f"| Plugins | {summary.total_plugins} |",
|
||||
f"| Commands | {summary.total_commands} |",
|
||||
f"| Agents | {summary.total_agents} |",
|
||||
f"| Tools | {summary.total_tools} |",
|
||||
f"| **Issues** | **{summary.total_issues}** |",
|
||||
f"| - Errors | {summary.errors} |",
|
||||
f"| - Warnings | {summary.warnings} |",
|
||||
f"| - Info | {summary.info} |",
|
||||
"",
|
||||
]
|
||||
|
||||
# Plugin details
|
||||
lines.extend([
|
||||
"## Plugins",
|
||||
"",
|
||||
])
|
||||
|
||||
for name, interface in interfaces.items():
|
||||
cmds = len(interface.get("commands", []))
|
||||
agents_count = len(interface.get("agents", []))
|
||||
tools = len(interface.get("tools", []))
|
||||
lines.append(f"### {name}")
|
||||
lines.append("")
|
||||
lines.append(f"- Commands: {cmds}")
|
||||
lines.append(f"- Agents: {agents_count}")
|
||||
lines.append(f"- Tools: {tools}")
|
||||
lines.append("")
|
||||
|
||||
# Compatibility results
|
||||
if compatibility_results:
|
||||
lines.extend([
|
||||
"## Compatibility Checks",
|
||||
"",
|
||||
])
|
||||
|
||||
for result in compatibility_results:
|
||||
status = "✓" if result.get("compatible", True) else "✗"
|
||||
lines.append(f"### {result['plugin_a']} ↔ {result['plugin_b']} {status}")
|
||||
lines.append("")
|
||||
|
||||
if result.get("shared_tools"):
|
||||
lines.append(f"- Shared tools: `{', '.join(result['shared_tools'])}`")
|
||||
if result.get("issues"):
|
||||
for issue in result["issues"]:
|
||||
sev = issue.get("severity", "info")
|
||||
if hasattr(sev, 'value'):
|
||||
sev = sev.value
|
||||
lines.append(f"- [{sev.upper()}] {issue['message']}")
|
||||
lines.append("")
|
||||
|
||||
# Issues section
|
||||
if issues:
|
||||
lines.extend([
|
||||
"## All Issues",
|
||||
"",
|
||||
"| Severity | Type | Message |",
|
||||
"|----------|------|---------|",
|
||||
])
|
||||
|
||||
for issue in issues:
|
||||
sev = issue.get("severity", "info")
|
||||
itype = issue.get("issue_type", "unknown")
|
||||
msg = issue.get("message", "")
|
||||
|
||||
if hasattr(sev, 'value'):
|
||||
sev = sev.value
|
||||
if hasattr(itype, 'value'):
|
||||
itype = itype.value
|
||||
|
||||
# Truncate message for table
|
||||
msg_short = msg[:60] + "..." if len(msg) > 60 else msg
|
||||
lines.append(f"| {sev} | {itype} | {msg_short} |")
|
||||
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
async def list_issues(
|
||||
self,
|
||||
marketplace_path: str,
|
||||
severity: str = "all",
|
||||
issue_type: str = "all"
|
||||
) -> dict:
|
||||
"""
|
||||
List validation issues with optional filtering.
|
||||
|
||||
Args:
|
||||
marketplace_path: Path to marketplace root directory
|
||||
severity: Filter by severity ("error", "warning", "info", "all")
|
||||
issue_type: Filter by type ("missing_tool", "interface_mismatch", etc., "all")
|
||||
|
||||
Returns:
|
||||
Filtered list of issues
|
||||
"""
|
||||
# Generate full report first
|
||||
report = await self.generate_compatibility_report(marketplace_path, format="json")
|
||||
|
||||
if "error" in report:
|
||||
return report
|
||||
|
||||
all_issues = report.get("all_issues", [])
|
||||
|
||||
# Filter by severity
|
||||
if severity != "all":
|
||||
filtered = []
|
||||
for issue in all_issues:
|
||||
issue_sev = issue.get("severity", "info")
|
||||
if hasattr(issue_sev, 'value'):
|
||||
issue_sev = issue_sev.value
|
||||
if isinstance(issue_sev, str) and severity.lower() in issue_sev.lower():
|
||||
filtered.append(issue)
|
||||
all_issues = filtered
|
||||
|
||||
# Filter by type
|
||||
if issue_type != "all":
|
||||
filtered = []
|
||||
for issue in all_issues:
|
||||
itype = issue.get("issue_type", "unknown")
|
||||
if hasattr(itype, 'value'):
|
||||
itype = itype.value
|
||||
if isinstance(itype, str) and issue_type.lower() in itype.lower():
|
||||
filtered.append(issue)
|
||||
all_issues = filtered
|
||||
|
||||
return {
|
||||
"marketplace_path": marketplace_path,
|
||||
"filters": {
|
||||
"severity": severity,
|
||||
"issue_type": issue_type
|
||||
},
|
||||
"total_issues": len(all_issues),
|
||||
"issues": all_issues
|
||||
}
|
||||
274
mcp-servers/contract-validator/mcp_server/server.py
Normal file
274
mcp-servers/contract-validator/mcp_server/server.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""
|
||||
MCP Server entry point for Contract Validator.
|
||||
|
||||
Provides cross-plugin compatibility validation and Claude.md agent verification
|
||||
tools to Claude Code via JSON-RPC 2.0 over stdio.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
from mcp.server import Server
|
||||
from mcp.server.stdio import stdio_server
|
||||
from mcp.types import Tool, TextContent
|
||||
|
||||
from .parse_tools import ParseTools
|
||||
from .validation_tools import ValidationTools
|
||||
from .report_tools import ReportTools
|
||||
|
||||
# Suppress noisy MCP validation warnings on stderr
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger("root").setLevel(logging.ERROR)
|
||||
logging.getLogger("mcp").setLevel(logging.ERROR)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContractValidatorMCPServer:
|
||||
"""MCP Server for cross-plugin compatibility validation"""
|
||||
|
||||
def __init__(self):
|
||||
self.server = Server("contract-validator-mcp")
|
||||
self.parse_tools = ParseTools()
|
||||
self.validation_tools = ValidationTools()
|
||||
self.report_tools = ReportTools()
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize server."""
|
||||
logger.info("Contract Validator MCP Server initialized")
|
||||
|
||||
def setup_tools(self):
|
||||
"""Register all available tools with the MCP server"""
|
||||
|
||||
@self.server.list_tools()
|
||||
async def list_tools() -> list[Tool]:
|
||||
"""Return list of available tools"""
|
||||
tools = [
|
||||
# Parse tools (to be implemented in #186)
|
||||
Tool(
|
||||
name="parse_plugin_interface",
|
||||
description="Parse plugin README.md to extract interface declarations (inputs, outputs, tools)",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"plugin_path": {
|
||||
"type": "string",
|
||||
"description": "Path to plugin directory or README.md"
|
||||
}
|
||||
},
|
||||
"required": ["plugin_path"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="parse_claude_md_agents",
|
||||
description="Parse Claude.md to extract agent definitions and their tool sequences",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"claude_md_path": {
|
||||
"type": "string",
|
||||
"description": "Path to CLAUDE.md file"
|
||||
}
|
||||
},
|
||||
"required": ["claude_md_path"]
|
||||
}
|
||||
),
|
||||
# Validation tools (to be implemented in #187)
|
||||
Tool(
|
||||
name="validate_compatibility",
|
||||
description="Validate compatibility between two plugin interfaces",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"plugin_a": {
|
||||
"type": "string",
|
||||
"description": "Path to first plugin"
|
||||
},
|
||||
"plugin_b": {
|
||||
"type": "string",
|
||||
"description": "Path to second plugin"
|
||||
}
|
||||
},
|
||||
"required": ["plugin_a", "plugin_b"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="validate_agent_refs",
|
||||
description="Validate that all tool references in an agent definition exist",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"agent_name": {
|
||||
"type": "string",
|
||||
"description": "Name of agent to validate"
|
||||
},
|
||||
"claude_md_path": {
|
||||
"type": "string",
|
||||
"description": "Path to CLAUDE.md containing agent"
|
||||
},
|
||||
"plugin_paths": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "Paths to available plugins"
|
||||
}
|
||||
},
|
||||
"required": ["agent_name", "claude_md_path"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="validate_data_flow",
|
||||
description="Validate data flow through an agent's tool sequence",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"agent_name": {
|
||||
"type": "string",
|
||||
"description": "Name of agent to validate"
|
||||
},
|
||||
"claude_md_path": {
|
||||
"type": "string",
|
||||
"description": "Path to CLAUDE.md containing agent"
|
||||
}
|
||||
},
|
||||
"required": ["agent_name", "claude_md_path"]
|
||||
}
|
||||
),
|
||||
# Report tools (to be implemented in #188)
|
||||
Tool(
|
||||
name="generate_compatibility_report",
|
||||
description="Generate a comprehensive compatibility report for all plugins",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"marketplace_path": {
|
||||
"type": "string",
|
||||
"description": "Path to marketplace root directory"
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["markdown", "json"],
|
||||
"default": "markdown",
|
||||
"description": "Output format"
|
||||
}
|
||||
},
|
||||
"required": ["marketplace_path"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="list_issues",
|
||||
description="List validation issues with optional filtering",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"marketplace_path": {
|
||||
"type": "string",
|
||||
"description": "Path to marketplace root directory"
|
||||
},
|
||||
"severity": {
|
||||
"type": "string",
|
||||
"enum": ["error", "warning", "info", "all"],
|
||||
"default": "all",
|
||||
"description": "Filter by severity"
|
||||
},
|
||||
"issue_type": {
|
||||
"type": "string",
|
||||
"enum": ["missing_tool", "interface_mismatch", "optional_dependency", "undeclared_output", "all"],
|
||||
"default": "all",
|
||||
"description": "Filter by issue type"
|
||||
}
|
||||
},
|
||||
"required": ["marketplace_path"]
|
||||
}
|
||||
)
|
||||
]
|
||||
return tools
|
||||
|
||||
@self.server.call_tool()
|
||||
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
||||
"""Handle tool invocation."""
|
||||
try:
|
||||
# All tools return placeholder responses for now
|
||||
# Actual implementation will be added in issues #186, #187, #188
|
||||
|
||||
if name == "parse_plugin_interface":
|
||||
result = await self._parse_plugin_interface(**arguments)
|
||||
elif name == "parse_claude_md_agents":
|
||||
result = await self._parse_claude_md_agents(**arguments)
|
||||
elif name == "validate_compatibility":
|
||||
result = await self._validate_compatibility(**arguments)
|
||||
elif name == "validate_agent_refs":
|
||||
result = await self._validate_agent_refs(**arguments)
|
||||
elif name == "validate_data_flow":
|
||||
result = await self._validate_data_flow(**arguments)
|
||||
elif name == "generate_compatibility_report":
|
||||
result = await self._generate_compatibility_report(**arguments)
|
||||
elif name == "list_issues":
|
||||
result = await self._list_issues(**arguments)
|
||||
else:
|
||||
raise ValueError(f"Unknown tool: {name}")
|
||||
|
||||
return [TextContent(
|
||||
type="text",
|
||||
text=json.dumps(result, indent=2, default=str)
|
||||
)]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Tool {name} failed: {e}")
|
||||
return [TextContent(
|
||||
type="text",
|
||||
text=json.dumps({"error": str(e)}, indent=2)
|
||||
)]
|
||||
|
||||
# Parse tool implementations (Issue #186)
|
||||
|
||||
async def _parse_plugin_interface(self, plugin_path: str) -> dict:
|
||||
"""Parse plugin interface from README.md"""
|
||||
return await self.parse_tools.parse_plugin_interface(plugin_path)
|
||||
|
||||
async def _parse_claude_md_agents(self, claude_md_path: str) -> dict:
|
||||
"""Parse agents from CLAUDE.md"""
|
||||
return await self.parse_tools.parse_claude_md_agents(claude_md_path)
|
||||
|
||||
# Validation tool implementations (Issue #187)
|
||||
|
||||
async def _validate_compatibility(self, plugin_a: str, plugin_b: str) -> dict:
|
||||
"""Validate compatibility between plugins"""
|
||||
return await self.validation_tools.validate_compatibility(plugin_a, plugin_b)
|
||||
|
||||
async def _validate_agent_refs(self, agent_name: str, claude_md_path: str, plugin_paths: list = None) -> dict:
|
||||
"""Validate agent tool references"""
|
||||
return await self.validation_tools.validate_agent_refs(agent_name, claude_md_path, plugin_paths)
|
||||
|
||||
async def _validate_data_flow(self, agent_name: str, claude_md_path: str) -> dict:
|
||||
"""Validate agent data flow"""
|
||||
return await self.validation_tools.validate_data_flow(agent_name, claude_md_path)
|
||||
|
||||
# Report tool implementations (Issue #188)
|
||||
|
||||
async def _generate_compatibility_report(self, marketplace_path: str, format: str = "markdown") -> dict:
|
||||
"""Generate comprehensive compatibility report"""
|
||||
return await self.report_tools.generate_compatibility_report(marketplace_path, format)
|
||||
|
||||
async def _list_issues(self, marketplace_path: str, severity: str = "all", issue_type: str = "all") -> dict:
|
||||
"""List validation issues with filtering"""
|
||||
return await self.report_tools.list_issues(marketplace_path, severity, issue_type)
|
||||
|
||||
async def run(self):
|
||||
"""Run the MCP server"""
|
||||
await self.initialize()
|
||||
self.setup_tools()
|
||||
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
await self.server.run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
self.server.create_initialization_options()
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main entry point"""
|
||||
server = ContractValidatorMCPServer()
|
||||
await server.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
338
mcp-servers/contract-validator/mcp_server/validation_tools.py
Normal file
338
mcp-servers/contract-validator/mcp_server/validation_tools.py
Normal file
@@ -0,0 +1,338 @@
|
||||
"""
|
||||
Validation tools for checking cross-plugin compatibility and agent references.
|
||||
|
||||
Provides:
|
||||
- validate_compatibility: Compare two plugin interfaces
|
||||
- validate_agent_refs: Check agent tool references exist
|
||||
- validate_data_flow: Verify data flow through agent sequences
|
||||
"""
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
from enum import Enum
|
||||
|
||||
from .parse_tools import ParseTools, PluginInterface, ClaudeMdAgent
|
||||
|
||||
|
||||
class IssueSeverity(str, Enum):
|
||||
ERROR = "error"
|
||||
WARNING = "warning"
|
||||
INFO = "info"
|
||||
|
||||
|
||||
class IssueType(str, Enum):
|
||||
MISSING_TOOL = "missing_tool"
|
||||
INTERFACE_MISMATCH = "interface_mismatch"
|
||||
OPTIONAL_DEPENDENCY = "optional_dependency"
|
||||
UNDECLARED_OUTPUT = "undeclared_output"
|
||||
INVALID_SEQUENCE = "invalid_sequence"
|
||||
|
||||
|
||||
class ValidationIssue(BaseModel):
|
||||
"""A single validation issue"""
|
||||
severity: IssueSeverity
|
||||
issue_type: IssueType
|
||||
message: str
|
||||
location: Optional[str] = None
|
||||
suggestion: Optional[str] = None
|
||||
|
||||
|
||||
class CompatibilityResult(BaseModel):
|
||||
"""Result of compatibility check between two plugins"""
|
||||
plugin_a: str
|
||||
plugin_b: str
|
||||
compatible: bool
|
||||
shared_tools: list[str] = []
|
||||
a_only_tools: list[str] = []
|
||||
b_only_tools: list[str] = []
|
||||
issues: list[ValidationIssue] = []
|
||||
|
||||
|
||||
class AgentValidationResult(BaseModel):
|
||||
"""Result of agent reference validation"""
|
||||
agent_name: str
|
||||
valid: bool
|
||||
tool_refs_found: list[str] = []
|
||||
tool_refs_missing: list[str] = []
|
||||
issues: list[ValidationIssue] = []
|
||||
|
||||
|
||||
class DataFlowResult(BaseModel):
|
||||
"""Result of data flow validation"""
|
||||
agent_name: str
|
||||
valid: bool
|
||||
flow_steps: list[str] = []
|
||||
issues: list[ValidationIssue] = []
|
||||
|
||||
|
||||
class ValidationTools:
|
||||
"""Tools for validating plugin compatibility and agent references"""
|
||||
|
||||
def __init__(self):
|
||||
self.parse_tools = ParseTools()
|
||||
|
||||
async def validate_compatibility(self, plugin_a: str, plugin_b: str) -> dict:
|
||||
"""
|
||||
Validate compatibility between two plugin interfaces.
|
||||
|
||||
Compares tools, commands, and agents to identify overlaps and gaps.
|
||||
|
||||
Args:
|
||||
plugin_a: Path to first plugin directory
|
||||
plugin_b: Path to second plugin directory
|
||||
|
||||
Returns:
|
||||
Compatibility report with shared tools, unique tools, and issues
|
||||
"""
|
||||
# Parse both plugins
|
||||
interface_a = await self.parse_tools.parse_plugin_interface(plugin_a)
|
||||
interface_b = await self.parse_tools.parse_plugin_interface(plugin_b)
|
||||
|
||||
# Check for parse errors
|
||||
if "error" in interface_a:
|
||||
return {
|
||||
"error": f"Failed to parse plugin A: {interface_a['error']}",
|
||||
"plugin_a": plugin_a,
|
||||
"plugin_b": plugin_b
|
||||
}
|
||||
if "error" in interface_b:
|
||||
return {
|
||||
"error": f"Failed to parse plugin B: {interface_b['error']}",
|
||||
"plugin_a": plugin_a,
|
||||
"plugin_b": plugin_b
|
||||
}
|
||||
|
||||
# Extract tool names
|
||||
tools_a = set(t["name"] for t in interface_a.get("tools", []))
|
||||
tools_b = set(t["name"] for t in interface_b.get("tools", []))
|
||||
|
||||
# Find overlaps and differences
|
||||
shared = tools_a & tools_b
|
||||
a_only = tools_a - tools_b
|
||||
b_only = tools_b - tools_a
|
||||
|
||||
issues = []
|
||||
|
||||
# Check for potential naming conflicts
|
||||
if shared:
|
||||
issues.append(ValidationIssue(
|
||||
severity=IssueSeverity.WARNING,
|
||||
issue_type=IssueType.INTERFACE_MISMATCH,
|
||||
message=f"Both plugins define tools with same names: {list(shared)}",
|
||||
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
|
||||
suggestion="Ensure tools with same names have compatible interfaces"
|
||||
))
|
||||
|
||||
# Check command overlaps
|
||||
cmds_a = set(c["name"] for c in interface_a.get("commands", []))
|
||||
cmds_b = set(c["name"] for c in interface_b.get("commands", []))
|
||||
shared_cmds = cmds_a & cmds_b
|
||||
|
||||
if shared_cmds:
|
||||
issues.append(ValidationIssue(
|
||||
severity=IssueSeverity.ERROR,
|
||||
issue_type=IssueType.INTERFACE_MISMATCH,
|
||||
message=f"Command name conflict: {list(shared_cmds)}",
|
||||
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
|
||||
suggestion="Rename conflicting commands to avoid ambiguity"
|
||||
))
|
||||
|
||||
result = CompatibilityResult(
|
||||
plugin_a=interface_a["plugin_name"],
|
||||
plugin_b=interface_b["plugin_name"],
|
||||
compatible=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
|
||||
shared_tools=list(shared),
|
||||
a_only_tools=list(a_only),
|
||||
b_only_tools=list(b_only),
|
||||
issues=issues
|
||||
)
|
||||
|
||||
return result.model_dump()
|
||||
|
||||
async def validate_agent_refs(
|
||||
self,
|
||||
agent_name: str,
|
||||
claude_md_path: str,
|
||||
plugin_paths: list[str] = None
|
||||
) -> dict:
|
||||
"""
|
||||
Validate that all tool references in an agent definition exist.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent to validate
|
||||
claude_md_path: Path to CLAUDE.md containing the agent
|
||||
plugin_paths: Optional list of plugin paths to check for tools
|
||||
|
||||
Returns:
|
||||
Validation result with found/missing tools and issues
|
||||
"""
|
||||
# Parse CLAUDE.md for agents
|
||||
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
|
||||
|
||||
if "error" in agents_result:
|
||||
return {
|
||||
"error": agents_result["error"],
|
||||
"agent_name": agent_name
|
||||
}
|
||||
|
||||
# Find the specific agent
|
||||
agent = None
|
||||
for a in agents_result.get("agents", []):
|
||||
if a["name"].lower() == agent_name.lower():
|
||||
agent = a
|
||||
break
|
||||
|
||||
if not agent:
|
||||
return {
|
||||
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
|
||||
"agent_name": agent_name,
|
||||
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
|
||||
}
|
||||
|
||||
# Collect all available tools from plugins
|
||||
available_tools = set()
|
||||
if plugin_paths:
|
||||
for plugin_path in plugin_paths:
|
||||
interface = await self.parse_tools.parse_plugin_interface(plugin_path)
|
||||
if "error" not in interface:
|
||||
for tool in interface.get("tools", []):
|
||||
available_tools.add(tool["name"])
|
||||
|
||||
# Check agent tool references
|
||||
tool_refs = set(agent.get("tool_refs", []))
|
||||
found = tool_refs & available_tools if available_tools else tool_refs
|
||||
missing = tool_refs - available_tools if available_tools else set()
|
||||
|
||||
issues = []
|
||||
|
||||
# Report missing tools
|
||||
for tool in missing:
|
||||
issues.append(ValidationIssue(
|
||||
severity=IssueSeverity.ERROR,
|
||||
issue_type=IssueType.MISSING_TOOL,
|
||||
message=f"Agent '{agent_name}' references tool '{tool}' which is not found",
|
||||
location=claude_md_path,
|
||||
suggestion=f"Check if tool '{tool}' exists or fix the reference"
|
||||
))
|
||||
|
||||
# Check if agent has no tool refs (might be incomplete)
|
||||
if not tool_refs:
|
||||
issues.append(ValidationIssue(
|
||||
severity=IssueSeverity.INFO,
|
||||
issue_type=IssueType.UNDECLARED_OUTPUT,
|
||||
message=f"Agent '{agent_name}' has no documented tool references",
|
||||
location=claude_md_path,
|
||||
suggestion="Consider documenting which tools this agent uses"
|
||||
))
|
||||
|
||||
result = AgentValidationResult(
|
||||
agent_name=agent_name,
|
||||
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
|
||||
tool_refs_found=list(found),
|
||||
tool_refs_missing=list(missing),
|
||||
issues=issues
|
||||
)
|
||||
|
||||
return result.model_dump()
|
||||
|
||||
async def validate_data_flow(self, agent_name: str, claude_md_path: str) -> dict:
|
||||
"""
|
||||
Validate data flow through an agent's tool sequence.
|
||||
|
||||
Checks that each step's expected output can be used by the next step.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent to validate
|
||||
claude_md_path: Path to CLAUDE.md containing the agent
|
||||
|
||||
Returns:
|
||||
Data flow validation result with steps and issues
|
||||
"""
|
||||
# Parse CLAUDE.md for agents
|
||||
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
|
||||
|
||||
if "error" in agents_result:
|
||||
return {
|
||||
"error": agents_result["error"],
|
||||
"agent_name": agent_name
|
||||
}
|
||||
|
||||
# Find the specific agent
|
||||
agent = None
|
||||
for a in agents_result.get("agents", []):
|
||||
if a["name"].lower() == agent_name.lower():
|
||||
agent = a
|
||||
break
|
||||
|
||||
if not agent:
|
||||
return {
|
||||
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
|
||||
"agent_name": agent_name,
|
||||
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
|
||||
}
|
||||
|
||||
issues = []
|
||||
flow_steps = []
|
||||
|
||||
# Extract workflow steps
|
||||
workflow_steps = agent.get("workflow_steps", [])
|
||||
responsibilities = agent.get("responsibilities", [])
|
||||
|
||||
# Build flow from workflow steps or responsibilities
|
||||
steps = workflow_steps if workflow_steps else responsibilities
|
||||
|
||||
for i, step in enumerate(steps):
|
||||
flow_steps.append(f"Step {i+1}: {step}")
|
||||
|
||||
# Check for data flow patterns
|
||||
tool_refs = agent.get("tool_refs", [])
|
||||
|
||||
# Known data flow patterns
|
||||
# e.g., data-platform produces data_ref, viz-platform consumes it
|
||||
known_producers = {
|
||||
"read_csv": "data_ref",
|
||||
"read_parquet": "data_ref",
|
||||
"pg_query": "data_ref",
|
||||
"filter": "data_ref",
|
||||
"groupby": "data_ref",
|
||||
}
|
||||
|
||||
known_consumers = {
|
||||
"describe": "data_ref",
|
||||
"head": "data_ref",
|
||||
"tail": "data_ref",
|
||||
"to_csv": "data_ref",
|
||||
"to_parquet": "data_ref",
|
||||
}
|
||||
|
||||
# Check if agent uses tools that require data_ref
|
||||
has_producer = any(t in known_producers for t in tool_refs)
|
||||
has_consumer = any(t in known_consumers for t in tool_refs)
|
||||
|
||||
if has_consumer and not has_producer:
|
||||
issues.append(ValidationIssue(
|
||||
severity=IssueSeverity.WARNING,
|
||||
issue_type=IssueType.INTERFACE_MISMATCH,
|
||||
message=f"Agent '{agent_name}' uses tools that consume data_ref but no producer found",
|
||||
location=claude_md_path,
|
||||
suggestion="Ensure a data loading tool (read_csv, pg_query, etc.) is used before data consumers"
|
||||
))
|
||||
|
||||
# Check for empty workflow
|
||||
if not steps and not tool_refs:
|
||||
issues.append(ValidationIssue(
|
||||
severity=IssueSeverity.INFO,
|
||||
issue_type=IssueType.UNDECLARED_OUTPUT,
|
||||
message=f"Agent '{agent_name}' has no documented workflow or tool sequence",
|
||||
location=claude_md_path,
|
||||
suggestion="Consider documenting the agent's workflow steps"
|
||||
))
|
||||
|
||||
result = DataFlowResult(
|
||||
agent_name=agent_name,
|
||||
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
|
||||
flow_steps=flow_steps,
|
||||
issues=issues
|
||||
)
|
||||
|
||||
return result.model_dump()
|
||||
41
mcp-servers/contract-validator/pyproject.toml
Normal file
41
mcp-servers/contract-validator/pyproject.toml
Normal file
@@ -0,0 +1,41 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "contract-validator-mcp"
|
||||
version = "1.0.0"
|
||||
description = "MCP Server for cross-plugin compatibility validation and agent verification"
|
||||
readme = "README.md"
|
||||
license = {text = "MIT"}
|
||||
requires-python = ">=3.10"
|
||||
authors = [
|
||||
{name = "Leo Miranda"}
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
]
|
||||
dependencies = [
|
||||
"mcp>=0.9.0",
|
||||
"pydantic>=2.5.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"pytest>=7.4.3",
|
||||
"pytest-asyncio>=0.23.0",
|
||||
]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
include = ["mcp_server*"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["tests"]
|
||||
9
mcp-servers/contract-validator/requirements.txt
Normal file
9
mcp-servers/contract-validator/requirements.txt
Normal file
@@ -0,0 +1,9 @@
|
||||
# MCP SDK
|
||||
mcp>=0.9.0
|
||||
|
||||
# Utilities
|
||||
pydantic>=2.5.0
|
||||
|
||||
# Testing
|
||||
pytest>=7.4.3
|
||||
pytest-asyncio>=0.23.0
|
||||
22
plugins/contract-validator/.claude-plugin/plugin.json
Normal file
22
plugins/contract-validator/.claude-plugin/plugin.json
Normal file
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"name": "contract-validator",
|
||||
"version": "1.0.0",
|
||||
"description": "Cross-plugin compatibility validation and Claude.md agent verification",
|
||||
"author": {
|
||||
"name": "Leo Miranda",
|
||||
"email": "leobmiranda@gmail.com"
|
||||
},
|
||||
"homepage": "https://gitea.hotserv.cloud/personal-projects/leo-claude-mktplace/src/branch/main/plugins/contract-validator/README.md",
|
||||
"repository": "https://gitea.hotserv.cloud/personal-projects/leo-claude-mktplace.git",
|
||||
"license": "MIT",
|
||||
"keywords": [
|
||||
"validation",
|
||||
"contracts",
|
||||
"compatibility",
|
||||
"agents",
|
||||
"interfaces",
|
||||
"cross-plugin"
|
||||
],
|
||||
"commands": ["./commands/"],
|
||||
"mcpServers": ["./.mcp.json"]
|
||||
}
|
||||
10
plugins/contract-validator/.mcp.json
Normal file
10
plugins/contract-validator/.mcp.json
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"mcpServers": {
|
||||
"contract-validator": {
|
||||
"type": "stdio",
|
||||
"command": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/contract-validator/.venv/bin/python",
|
||||
"args": ["-m", "mcp_server.server"],
|
||||
"cwd": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/contract-validator"
|
||||
}
|
||||
}
|
||||
}
|
||||
1
plugins/contract-validator/mcp-servers/contract-validator
Symbolic link
1
plugins/contract-validator/mcp-servers/contract-validator
Symbolic link
@@ -0,0 +1 @@
|
||||
../../../mcp-servers/contract-validator
|
||||
Reference in New Issue
Block a user