Files
lmiranda 72b3436a24 feat(contract-validator): add gate contract versioning
- design-gate.md and data-gate.md declare gate_contract: v1
- domain-consultation.md Gate Command Reference includes Contract column
- validate_workflow_integration now checks contract version compatibility
- Tests added for match, mismatch, and missing contract scenarios

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 13:54:19 -05:00

494 lines
18 KiB
Python

"""
Validation tools for checking cross-plugin compatibility and agent references.
Provides:
- validate_compatibility: Compare two plugin interfaces
- validate_agent_refs: Check agent tool references exist
- validate_data_flow: Verify data flow through agent sequences
"""
from pathlib import Path
from typing import Optional
from pydantic import BaseModel
from enum import Enum
from .parse_tools import ParseTools, PluginInterface, ClaudeMdAgent
class IssueSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
INFO = "info"
class IssueType(str, Enum):
MISSING_TOOL = "missing_tool"
INTERFACE_MISMATCH = "interface_mismatch"
OPTIONAL_DEPENDENCY = "optional_dependency"
UNDECLARED_OUTPUT = "undeclared_output"
INVALID_SEQUENCE = "invalid_sequence"
MISSING_INTEGRATION = "missing_integration"
class ValidationIssue(BaseModel):
"""A single validation issue"""
severity: IssueSeverity
issue_type: IssueType
message: str
location: Optional[str] = None
suggestion: Optional[str] = None
class CompatibilityResult(BaseModel):
"""Result of compatibility check between two plugins"""
plugin_a: str
plugin_b: str
compatible: bool
shared_tools: list[str] = []
a_only_tools: list[str] = []
b_only_tools: list[str] = []
issues: list[ValidationIssue] = []
class AgentValidationResult(BaseModel):
"""Result of agent reference validation"""
agent_name: str
valid: bool
tool_refs_found: list[str] = []
tool_refs_missing: list[str] = []
issues: list[ValidationIssue] = []
class DataFlowResult(BaseModel):
"""Result of data flow validation"""
agent_name: str
valid: bool
flow_steps: list[str] = []
issues: list[ValidationIssue] = []
class WorkflowIntegrationResult(BaseModel):
"""Result of workflow integration validation for domain plugins"""
plugin_name: str
domain_label: str
valid: bool
gate_command_found: bool
gate_contract: Optional[str] = None # Contract version declared by gate command
review_command_found: bool
advisory_agent_found: bool
issues: list[ValidationIssue] = []
class ValidationTools:
"""Tools for validating plugin compatibility and agent references"""
def __init__(self):
self.parse_tools = ParseTools()
async def validate_compatibility(self, plugin_a: str, plugin_b: str) -> dict:
"""
Validate compatibility between two plugin interfaces.
Compares tools, commands, and agents to identify overlaps and gaps.
Args:
plugin_a: Path to first plugin directory
plugin_b: Path to second plugin directory
Returns:
Compatibility report with shared tools, unique tools, and issues
"""
# Parse both plugins
interface_a = await self.parse_tools.parse_plugin_interface(plugin_a)
interface_b = await self.parse_tools.parse_plugin_interface(plugin_b)
# Check for parse errors
if "error" in interface_a:
return {
"error": f"Failed to parse plugin A: {interface_a['error']}",
"plugin_a": plugin_a,
"plugin_b": plugin_b
}
if "error" in interface_b:
return {
"error": f"Failed to parse plugin B: {interface_b['error']}",
"plugin_a": plugin_a,
"plugin_b": plugin_b
}
# Extract tool names
tools_a = set(t["name"] for t in interface_a.get("tools", []))
tools_b = set(t["name"] for t in interface_b.get("tools", []))
# Find overlaps and differences
shared = tools_a & tools_b
a_only = tools_a - tools_b
b_only = tools_b - tools_a
issues = []
# Check for potential naming conflicts
if shared:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Both plugins define tools with same names: {list(shared)}",
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
suggestion="Ensure tools with same names have compatible interfaces"
))
# Check command overlaps
cmds_a = set(c["name"] for c in interface_a.get("commands", []))
cmds_b = set(c["name"] for c in interface_b.get("commands", []))
shared_cmds = cmds_a & cmds_b
if shared_cmds:
issues.append(ValidationIssue(
severity=IssueSeverity.ERROR,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Command name conflict: {list(shared_cmds)}",
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
suggestion="Rename conflicting commands to avoid ambiguity"
))
result = CompatibilityResult(
plugin_a=interface_a["plugin_name"],
plugin_b=interface_b["plugin_name"],
compatible=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
shared_tools=list(shared),
a_only_tools=list(a_only),
b_only_tools=list(b_only),
issues=issues
)
return result.model_dump()
async def validate_agent_refs(
self,
agent_name: str,
claude_md_path: str,
plugin_paths: list[str] = None
) -> dict:
"""
Validate that all tool references in an agent definition exist.
Args:
agent_name: Name of the agent to validate
claude_md_path: Path to CLAUDE.md containing the agent
plugin_paths: Optional list of plugin paths to check for tools
Returns:
Validation result with found/missing tools and issues
"""
# Parse CLAUDE.md for agents
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
if "error" in agents_result:
return {
"error": agents_result["error"],
"agent_name": agent_name
}
# Find the specific agent
agent = None
for a in agents_result.get("agents", []):
if a["name"].lower() == agent_name.lower():
agent = a
break
if not agent:
return {
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
"agent_name": agent_name,
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
}
# Collect all available tools from plugins
available_tools = set()
if plugin_paths:
for plugin_path in plugin_paths:
interface = await self.parse_tools.parse_plugin_interface(plugin_path)
if "error" not in interface:
for tool in interface.get("tools", []):
available_tools.add(tool["name"])
# Check agent tool references
tool_refs = set(agent.get("tool_refs", []))
found = tool_refs & available_tools if available_tools else tool_refs
missing = tool_refs - available_tools if available_tools else set()
issues = []
# Report missing tools
for tool in missing:
issues.append(ValidationIssue(
severity=IssueSeverity.ERROR,
issue_type=IssueType.MISSING_TOOL,
message=f"Agent '{agent_name}' references tool '{tool}' which is not found",
location=claude_md_path,
suggestion=f"Check if tool '{tool}' exists or fix the reference"
))
# Check if agent has no tool refs (might be incomplete)
if not tool_refs:
issues.append(ValidationIssue(
severity=IssueSeverity.INFO,
issue_type=IssueType.UNDECLARED_OUTPUT,
message=f"Agent '{agent_name}' has no documented tool references",
location=claude_md_path,
suggestion="Consider documenting which tools this agent uses"
))
result = AgentValidationResult(
agent_name=agent_name,
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
tool_refs_found=list(found),
tool_refs_missing=list(missing),
issues=issues
)
return result.model_dump()
async def validate_data_flow(self, agent_name: str, claude_md_path: str) -> dict:
"""
Validate data flow through an agent's tool sequence.
Checks that each step's expected output can be used by the next step.
Args:
agent_name: Name of the agent to validate
claude_md_path: Path to CLAUDE.md containing the agent
Returns:
Data flow validation result with steps and issues
"""
# Parse CLAUDE.md for agents
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
if "error" in agents_result:
return {
"error": agents_result["error"],
"agent_name": agent_name
}
# Find the specific agent
agent = None
for a in agents_result.get("agents", []):
if a["name"].lower() == agent_name.lower():
agent = a
break
if not agent:
return {
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
"agent_name": agent_name,
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
}
issues = []
flow_steps = []
# Extract workflow steps
workflow_steps = agent.get("workflow_steps", [])
responsibilities = agent.get("responsibilities", [])
# Build flow from workflow steps or responsibilities
steps = workflow_steps if workflow_steps else responsibilities
for i, step in enumerate(steps):
flow_steps.append(f"Step {i+1}: {step}")
# Check for data flow patterns
tool_refs = agent.get("tool_refs", [])
# Known data flow patterns
# e.g., data-platform produces data_ref, viz-platform consumes it
known_producers = {
"read_csv": "data_ref",
"read_parquet": "data_ref",
"pg_query": "data_ref",
"filter": "data_ref",
"groupby": "data_ref",
}
known_consumers = {
"describe": "data_ref",
"head": "data_ref",
"tail": "data_ref",
"to_csv": "data_ref",
"to_parquet": "data_ref",
}
# Check if agent uses tools that require data_ref
has_producer = any(t in known_producers for t in tool_refs)
has_consumer = any(t in known_consumers for t in tool_refs)
if has_consumer and not has_producer:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Agent '{agent_name}' uses tools that consume data_ref but no producer found",
location=claude_md_path,
suggestion="Ensure a data loading tool (read_csv, pg_query, etc.) is used before data consumers"
))
# Check for empty workflow
if not steps and not tool_refs:
issues.append(ValidationIssue(
severity=IssueSeverity.INFO,
issue_type=IssueType.UNDECLARED_OUTPUT,
message=f"Agent '{agent_name}' has no documented workflow or tool sequence",
location=claude_md_path,
suggestion="Consider documenting the agent's workflow steps"
))
result = DataFlowResult(
agent_name=agent_name,
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
flow_steps=flow_steps,
issues=issues
)
return result.model_dump()
async def validate_workflow_integration(
self,
plugin_path: str,
domain_label: str,
expected_contract: Optional[str] = None
) -> dict:
"""
Validate that a domain plugin exposes required advisory interfaces.
Checks for:
- Gate command (e.g., /design-gate, /data-gate) - REQUIRED
- Gate contract version (gate_contract in frontmatter) - INFO if missing
- Review command (e.g., /design-review, /data-review) - recommended
- Advisory agent referencing the domain label - recommended
Args:
plugin_path: Path to the domain plugin directory
domain_label: The Domain/* label it claims to handle (e.g., Domain/Viz)
expected_contract: Expected contract version (e.g., 'v1'). If provided,
validates the gate command's contract matches.
Returns:
Validation result with found interfaces and issues
"""
import re
plugin_path_obj = Path(plugin_path)
issues = []
# Extract plugin name from path
plugin_name = plugin_path_obj.name
if not plugin_path_obj.exists():
return {
"error": f"Plugin directory not found: {plugin_path}",
"plugin_path": plugin_path,
"domain_label": domain_label
}
# Extract domain short name from label (e.g., "Domain/Viz" -> "viz", "Domain/Data" -> "data")
domain_short = domain_label.split("/")[-1].lower() if "/" in domain_label else domain_label.lower()
# Check for gate command
commands_dir = plugin_path_obj / "commands"
gate_command_found = False
gate_contract = None
gate_patterns = ["pass", "fail", "PASS", "FAIL", "Binary pass/fail", "gate"]
if commands_dir.exists():
for cmd_file in commands_dir.glob("*.md"):
if "gate" in cmd_file.name.lower():
# Verify it's actually a gate command by checking content
content = cmd_file.read_text()
if any(pattern in content for pattern in gate_patterns):
gate_command_found = True
# Parse frontmatter for gate_contract
frontmatter_match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
if frontmatter_match:
frontmatter = frontmatter_match.group(1)
contract_match = re.search(r'gate_contract:\s*(\S+)', frontmatter)
if contract_match:
gate_contract = contract_match.group(1)
break
if not gate_command_found:
issues.append(ValidationIssue(
severity=IssueSeverity.ERROR,
issue_type=IssueType.MISSING_INTEGRATION,
message=f"Plugin '{plugin_name}' lacks a gate command for domain '{domain_label}'",
location=str(commands_dir),
suggestion=f"Create commands/{domain_short}-gate.md with binary PASS/FAIL output"
))
# Check for review command
review_command_found = False
if commands_dir.exists():
for cmd_file in commands_dir.glob("*.md"):
if "review" in cmd_file.name.lower() and "gate" not in cmd_file.name.lower():
review_command_found = True
break
if not review_command_found:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.MISSING_INTEGRATION,
message=f"Plugin '{plugin_name}' lacks a review command for domain '{domain_label}'",
location=str(commands_dir),
suggestion=f"Create commands/{domain_short}-review.md for detailed audits"
))
# Check for advisory agent
agents_dir = plugin_path_obj / "agents"
advisory_agent_found = False
if agents_dir.exists():
for agent_file in agents_dir.glob("*.md"):
content = agent_file.read_text()
# Check if agent references the domain label or gate command
if domain_label in content or f"{domain_short}-gate" in content.lower() or "advisor" in agent_file.name.lower() or "reviewer" in agent_file.name.lower():
advisory_agent_found = True
break
if not advisory_agent_found:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.MISSING_INTEGRATION,
message=f"Plugin '{plugin_name}' lacks an advisory agent for domain '{domain_label}'",
location=str(agents_dir) if agents_dir.exists() else str(plugin_path_obj),
suggestion=f"Create agents/{domain_short}-advisor.md referencing '{domain_label}'"
))
# Check gate contract version
if gate_command_found:
if not gate_contract:
issues.append(ValidationIssue(
severity=IssueSeverity.INFO,
issue_type=IssueType.MISSING_INTEGRATION,
message=f"Gate command does not declare a contract version",
location=str(commands_dir),
suggestion="Consider adding `gate_contract: v1` to frontmatter for version tracking"
))
elif expected_contract and gate_contract != expected_contract:
issues.append(ValidationIssue(
severity=IssueSeverity.WARNING,
issue_type=IssueType.INTERFACE_MISMATCH,
message=f"Contract version mismatch: gate declares {gate_contract}, projman expects {expected_contract}",
location=str(commands_dir),
suggestion=f"Update domain-consultation.md Gate Command Reference table to {gate_contract}, or update gate command to {expected_contract}"
))
result = WorkflowIntegrationResult(
plugin_name=plugin_name,
domain_label=domain_label,
valid=gate_command_found, # Only gate is required for validity
gate_command_found=gate_command_found,
gate_contract=gate_contract,
review_command_found=review_command_found,
advisory_agent_found=advisory_agent_found,
issues=issues
)
return result.model_dump()