feat(contract-validator): implement validation tools (#187)

Implement validate_compatibility, validate_agent_refs, and
validate_data_flow tools for cross-plugin validation.

validate_compatibility:
- Compares tool and command names between plugins
- Identifies naming conflicts (ERROR) and shared tools (WARNING)
- Found: data-platform and projman share /initial-setup command

validate_agent_refs:
- Checks agent tool references against available plugins
- Reports missing tools and undocumented references
- Supports optional plugin_paths for tool lookup

validate_data_flow:
- Validates data flow through agent tool sequences
- Checks producer/consumer patterns (e.g., data_ref)
- Extracts workflow steps from responsibilities

Issue types detected:
- missing_tool (ERROR)
- interface_mismatch (ERROR/WARNING)
- optional_dependency (WARNING)
- undeclared_output (WARNING/INFO)

Sprint: Sprint 2 - contract-validator Plugin

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-26 14:50:12 -05:00
parent 2b041cb771
commit 46af00019c
2 changed files with 350 additions and 23 deletions

View File

@@ -12,6 +12,7 @@ from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .parse_tools import ParseTools
from .validation_tools import ValidationTools
# Suppress noisy MCP validation warnings on stderr
logging.basicConfig(level=logging.INFO)
@@ -26,10 +27,11 @@ class ContractValidatorMCPServer:
def __init__(self):
self.server = Server("contract-validator-mcp")
self.parse_tools = ParseTools()
self.validation_tools = ValidationTools()
async def initialize(self):
"""Initialize server."""
logger.info("Contract Validator MCP Server initialized with parse tools")
logger.info("Contract Validator MCP Server initialized with parse and validation tools")
def setup_tools(self):
"""Register all available tools with the MCP server"""
@@ -223,34 +225,21 @@ class ContractValidatorMCPServer:
"""Parse agents from CLAUDE.md"""
return await self.parse_tools.parse_claude_md_agents(claude_md_path)
# Placeholder implementations - to be completed in subsequent issues
# Validation tool implementations (Issue #187)
async def _validate_compatibility(self, plugin_a: str, plugin_b: str) -> dict:
"""Validate compatibility between plugins (placeholder)"""
return {
"status": "not_implemented",
"message": "Implementation pending - Issue #187",
"plugin_a": plugin_a,
"plugin_b": plugin_b
}
"""Validate compatibility between plugins"""
return await self.validation_tools.validate_compatibility(plugin_a, plugin_b)
async def _validate_agent_refs(self, agent_name: str, claude_md_path: str, plugin_paths: list = None) -> dict:
"""Validate agent tool references (placeholder)"""
return {
"status": "not_implemented",
"message": "Implementation pending - Issue #187",
"agent_name": agent_name,
"claude_md_path": claude_md_path
}
"""Validate agent tool references"""
return await self.validation_tools.validate_agent_refs(agent_name, claude_md_path, plugin_paths)
async def _validate_data_flow(self, agent_name: str, claude_md_path: str) -> dict:
"""Validate agent data flow (placeholder)"""
return {
"status": "not_implemented",
"message": "Implementation pending - Issue #187",
"agent_name": agent_name,
"claude_md_path": claude_md_path
}
"""Validate agent data flow"""
return await self.validation_tools.validate_data_flow(agent_name, claude_md_path)
# Placeholder implementations - to be completed in subsequent issues
async def _generate_compatibility_report(self, marketplace_path: str, format: str = "markdown") -> dict:
"""Generate compatibility report (placeholder)"""

View File

@@ -0,0 +1,338 @@
"""
Validation tools for checking cross-plugin compatibility and agent references.
Provides:
- validate_compatibility: Compare two plugin interfaces
- validate_agent_refs: Check agent tool references exist
- validate_data_flow: Verify data flow through agent sequences
"""
from pathlib import Path
from typing import Optional
from pydantic import BaseModel
from enum import Enum
from .parse_tools import ParseTools, PluginInterface, ClaudeMdAgent
class IssueSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
INFO = "info"
class IssueType(str, Enum):
MISSING_TOOL = "missing_tool"
INTERFACE_MISMATCH = "interface_mismatch"
OPTIONAL_DEPENDENCY = "optional_dependency"
UNDECLARED_OUTPUT = "undeclared_output"
INVALID_SEQUENCE = "invalid_sequence"
class ValidationIssue(BaseModel):
"""A single validation issue"""
severity: IssueSeverity
issue_type: IssueType
message: str
location: Optional[str] = None
suggestion: Optional[str] = None
class CompatibilityResult(BaseModel):
"""Result of compatibility check between two plugins"""
plugin_a: str
plugin_b: str
compatible: bool
shared_tools: list[str] = []
a_only_tools: list[str] = []
b_only_tools: list[str] = []
issues: list[ValidationIssue] = []
class AgentValidationResult(BaseModel):
"""Result of agent reference validation"""
agent_name: str
valid: bool
tool_refs_found: list[str] = []
tool_refs_missing: list[str] = []
issues: list[ValidationIssue] = []
class DataFlowResult(BaseModel):
"""Result of data flow validation"""
agent_name: str
valid: bool
flow_steps: list[str] = []
issues: list[ValidationIssue] = []
class ValidationTools:
"""Tools for validating plugin compatibility and agent references"""
def __init__(self):
self.parse_tools = ParseTools()
async def validate_compatibility(self, plugin_a: str, plugin_b: str) -> dict:
"""
Validate compatibility between two plugin interfaces.
Compares tools, commands, and agents to identify overlaps and gaps.
Args:
plugin_a: Path to first plugin directory
plugin_b: Path to second plugin directory
Returns:
Compatibility report with shared tools, unique tools, and issues
"""
# Parse both plugins
interface_a = await self.parse_tools.parse_plugin_interface(plugin_a)
interface_b = await self.parse_tools.parse_plugin_interface(plugin_b)
# Check for parse errors
if "error" in interface_a:
return {
"error": f"Failed to parse plugin A: {interface_a['error']}",
"plugin_a": plugin_a,
"plugin_b": plugin_b
}
if "error" in interface_b:
return {
"error": f"Failed to parse plugin B: {interface_b['error']}",
"plugin_a": plugin_a,
"plugin_b": plugin_b
}
# Extract tool names
tools_a = set(t["name"] for t in interface_a.get("tools", []))
tools_b = set(t["name"] for t in interface_b.get("tools", []))
# Find overlaps and differences
shared = tools_a & tools_b
a_only = tools_a - tools_b
b_only = tools_b - tools_a
issues = []
# Check for potential naming conflicts
if shared:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Both plugins define tools with same names: {list(shared)}",
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
suggestion="Ensure tools with same names have compatible interfaces"
))
# Check command overlaps
cmds_a = set(c["name"] for c in interface_a.get("commands", []))
cmds_b = set(c["name"] for c in interface_b.get("commands", []))
shared_cmds = cmds_a & cmds_b
if shared_cmds:
issues.append(ValidationIssue(
severity=IssueSeverity.ERROR,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Command name conflict: {list(shared_cmds)}",
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
suggestion="Rename conflicting commands to avoid ambiguity"
))
result = CompatibilityResult(
plugin_a=interface_a["plugin_name"],
plugin_b=interface_b["plugin_name"],
compatible=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
shared_tools=list(shared),
a_only_tools=list(a_only),
b_only_tools=list(b_only),
issues=issues
)
return result.model_dump()
async def validate_agent_refs(
self,
agent_name: str,
claude_md_path: str,
plugin_paths: list[str] = None
) -> dict:
"""
Validate that all tool references in an agent definition exist.
Args:
agent_name: Name of the agent to validate
claude_md_path: Path to CLAUDE.md containing the agent
plugin_paths: Optional list of plugin paths to check for tools
Returns:
Validation result with found/missing tools and issues
"""
# Parse CLAUDE.md for agents
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
if "error" in agents_result:
return {
"error": agents_result["error"],
"agent_name": agent_name
}
# Find the specific agent
agent = None
for a in agents_result.get("agents", []):
if a["name"].lower() == agent_name.lower():
agent = a
break
if not agent:
return {
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
"agent_name": agent_name,
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
}
# Collect all available tools from plugins
available_tools = set()
if plugin_paths:
for plugin_path in plugin_paths:
interface = await self.parse_tools.parse_plugin_interface(plugin_path)
if "error" not in interface:
for tool in interface.get("tools", []):
available_tools.add(tool["name"])
# Check agent tool references
tool_refs = set(agent.get("tool_refs", []))
found = tool_refs & available_tools if available_tools else tool_refs
missing = tool_refs - available_tools if available_tools else set()
issues = []
# Report missing tools
for tool in missing:
issues.append(ValidationIssue(
severity=IssueSeverity.ERROR,
issue_type=IssueType.MISSING_TOOL,
message=f"Agent '{agent_name}' references tool '{tool}' which is not found",
location=claude_md_path,
suggestion=f"Check if tool '{tool}' exists or fix the reference"
))
# Check if agent has no tool refs (might be incomplete)
if not tool_refs:
issues.append(ValidationIssue(
severity=IssueSeverity.INFO,
issue_type=IssueType.UNDECLARED_OUTPUT,
message=f"Agent '{agent_name}' has no documented tool references",
location=claude_md_path,
suggestion="Consider documenting which tools this agent uses"
))
result = AgentValidationResult(
agent_name=agent_name,
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
tool_refs_found=list(found),
tool_refs_missing=list(missing),
issues=issues
)
return result.model_dump()
async def validate_data_flow(self, agent_name: str, claude_md_path: str) -> dict:
"""
Validate data flow through an agent's tool sequence.
Checks that each step's expected output can be used by the next step.
Args:
agent_name: Name of the agent to validate
claude_md_path: Path to CLAUDE.md containing the agent
Returns:
Data flow validation result with steps and issues
"""
# Parse CLAUDE.md for agents
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
if "error" in agents_result:
return {
"error": agents_result["error"],
"agent_name": agent_name
}
# Find the specific agent
agent = None
for a in agents_result.get("agents", []):
if a["name"].lower() == agent_name.lower():
agent = a
break
if not agent:
return {
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
"agent_name": agent_name,
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
}
issues = []
flow_steps = []
# Extract workflow steps
workflow_steps = agent.get("workflow_steps", [])
responsibilities = agent.get("responsibilities", [])
# Build flow from workflow steps or responsibilities
steps = workflow_steps if workflow_steps else responsibilities
for i, step in enumerate(steps):
flow_steps.append(f"Step {i+1}: {step}")
# Check for data flow patterns
tool_refs = agent.get("tool_refs", [])
# Known data flow patterns
# e.g., data-platform produces data_ref, viz-platform consumes it
known_producers = {
"read_csv": "data_ref",
"read_parquet": "data_ref",
"pg_query": "data_ref",
"filter": "data_ref",
"groupby": "data_ref",
}
known_consumers = {
"describe": "data_ref",
"head": "data_ref",
"tail": "data_ref",
"to_csv": "data_ref",
"to_parquet": "data_ref",
}
# Check if agent uses tools that require data_ref
has_producer = any(t in known_producers for t in tool_refs)
has_consumer = any(t in known_consumers for t in tool_refs)
if has_consumer and not has_producer:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Agent '{agent_name}' uses tools that consume data_ref but no producer found",
location=claude_md_path,
suggestion="Ensure a data loading tool (read_csv, pg_query, etc.) is used before data consumers"
))
# Check for empty workflow
if not steps and not tool_refs:
issues.append(ValidationIssue(
severity=IssueSeverity.INFO,
issue_type=IssueType.UNDECLARED_OUTPUT,
message=f"Agent '{agent_name}' has no documented workflow or tool sequence",
location=claude_md_path,
suggestion="Consider documenting the agent's workflow steps"
))
result = DataFlowResult(
agent_name=agent_name,
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
flow_steps=flow_steps,
issues=issues
)
return result.model_dump()