- design-gate.md and data-gate.md declare gate_contract: v1 - domain-consultation.md Gate Command Reference includes Contract column - validate_workflow_integration now checks contract version compatibility - Tests added for match, mismatch, and missing contract scenarios Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
494 lines
18 KiB
Python
494 lines
18 KiB
Python
"""
|
|
Validation tools for checking cross-plugin compatibility and agent references.
|
|
|
|
Provides:
|
|
- validate_compatibility: Compare two plugin interfaces
|
|
- validate_agent_refs: Check agent tool references exist
|
|
- validate_data_flow: Verify data flow through agent sequences
|
|
"""
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from pydantic import BaseModel
|
|
from enum import Enum
|
|
|
|
from .parse_tools import ParseTools, PluginInterface, ClaudeMdAgent
|
|
|
|
|
|
class IssueSeverity(str, Enum):
|
|
ERROR = "error"
|
|
WARNING = "warning"
|
|
INFO = "info"
|
|
|
|
|
|
class IssueType(str, Enum):
|
|
MISSING_TOOL = "missing_tool"
|
|
INTERFACE_MISMATCH = "interface_mismatch"
|
|
OPTIONAL_DEPENDENCY = "optional_dependency"
|
|
UNDECLARED_OUTPUT = "undeclared_output"
|
|
INVALID_SEQUENCE = "invalid_sequence"
|
|
MISSING_INTEGRATION = "missing_integration"
|
|
|
|
|
|
class ValidationIssue(BaseModel):
|
|
"""A single validation issue"""
|
|
severity: IssueSeverity
|
|
issue_type: IssueType
|
|
message: str
|
|
location: Optional[str] = None
|
|
suggestion: Optional[str] = None
|
|
|
|
|
|
class CompatibilityResult(BaseModel):
|
|
"""Result of compatibility check between two plugins"""
|
|
plugin_a: str
|
|
plugin_b: str
|
|
compatible: bool
|
|
shared_tools: list[str] = []
|
|
a_only_tools: list[str] = []
|
|
b_only_tools: list[str] = []
|
|
issues: list[ValidationIssue] = []
|
|
|
|
|
|
class AgentValidationResult(BaseModel):
|
|
"""Result of agent reference validation"""
|
|
agent_name: str
|
|
valid: bool
|
|
tool_refs_found: list[str] = []
|
|
tool_refs_missing: list[str] = []
|
|
issues: list[ValidationIssue] = []
|
|
|
|
|
|
class DataFlowResult(BaseModel):
|
|
"""Result of data flow validation"""
|
|
agent_name: str
|
|
valid: bool
|
|
flow_steps: list[str] = []
|
|
issues: list[ValidationIssue] = []
|
|
|
|
|
|
class WorkflowIntegrationResult(BaseModel):
|
|
"""Result of workflow integration validation for domain plugins"""
|
|
plugin_name: str
|
|
domain_label: str
|
|
valid: bool
|
|
gate_command_found: bool
|
|
gate_contract: Optional[str] = None # Contract version declared by gate command
|
|
review_command_found: bool
|
|
advisory_agent_found: bool
|
|
issues: list[ValidationIssue] = []
|
|
|
|
|
|
class ValidationTools:
|
|
"""Tools for validating plugin compatibility and agent references"""
|
|
|
|
def __init__(self):
|
|
self.parse_tools = ParseTools()
|
|
|
|
async def validate_compatibility(self, plugin_a: str, plugin_b: str) -> dict:
|
|
"""
|
|
Validate compatibility between two plugin interfaces.
|
|
|
|
Compares tools, commands, and agents to identify overlaps and gaps.
|
|
|
|
Args:
|
|
plugin_a: Path to first plugin directory
|
|
plugin_b: Path to second plugin directory
|
|
|
|
Returns:
|
|
Compatibility report with shared tools, unique tools, and issues
|
|
"""
|
|
# Parse both plugins
|
|
interface_a = await self.parse_tools.parse_plugin_interface(plugin_a)
|
|
interface_b = await self.parse_tools.parse_plugin_interface(plugin_b)
|
|
|
|
# Check for parse errors
|
|
if "error" in interface_a:
|
|
return {
|
|
"error": f"Failed to parse plugin A: {interface_a['error']}",
|
|
"plugin_a": plugin_a,
|
|
"plugin_b": plugin_b
|
|
}
|
|
if "error" in interface_b:
|
|
return {
|
|
"error": f"Failed to parse plugin B: {interface_b['error']}",
|
|
"plugin_a": plugin_a,
|
|
"plugin_b": plugin_b
|
|
}
|
|
|
|
# Extract tool names
|
|
tools_a = set(t["name"] for t in interface_a.get("tools", []))
|
|
tools_b = set(t["name"] for t in interface_b.get("tools", []))
|
|
|
|
# Find overlaps and differences
|
|
shared = tools_a & tools_b
|
|
a_only = tools_a - tools_b
|
|
b_only = tools_b - tools_a
|
|
|
|
issues = []
|
|
|
|
# Check for potential naming conflicts
|
|
if shared:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.WARNING,
|
|
issue_type=IssueType.INTERFACE_MISMATCH,
|
|
message=f"Both plugins define tools with same names: {list(shared)}",
|
|
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
|
|
suggestion="Ensure tools with same names have compatible interfaces"
|
|
))
|
|
|
|
# Check command overlaps
|
|
cmds_a = set(c["name"] for c in interface_a.get("commands", []))
|
|
cmds_b = set(c["name"] for c in interface_b.get("commands", []))
|
|
shared_cmds = cmds_a & cmds_b
|
|
|
|
if shared_cmds:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.ERROR,
|
|
issue_type=IssueType.INTERFACE_MISMATCH,
|
|
message=f"Command name conflict: {list(shared_cmds)}",
|
|
location=f"{interface_a['plugin_name']} and {interface_b['plugin_name']}",
|
|
suggestion="Rename conflicting commands to avoid ambiguity"
|
|
))
|
|
|
|
result = CompatibilityResult(
|
|
plugin_a=interface_a["plugin_name"],
|
|
plugin_b=interface_b["plugin_name"],
|
|
compatible=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
|
|
shared_tools=list(shared),
|
|
a_only_tools=list(a_only),
|
|
b_only_tools=list(b_only),
|
|
issues=issues
|
|
)
|
|
|
|
return result.model_dump()
|
|
|
|
async def validate_agent_refs(
|
|
self,
|
|
agent_name: str,
|
|
claude_md_path: str,
|
|
plugin_paths: list[str] = None
|
|
) -> dict:
|
|
"""
|
|
Validate that all tool references in an agent definition exist.
|
|
|
|
Args:
|
|
agent_name: Name of the agent to validate
|
|
claude_md_path: Path to CLAUDE.md containing the agent
|
|
plugin_paths: Optional list of plugin paths to check for tools
|
|
|
|
Returns:
|
|
Validation result with found/missing tools and issues
|
|
"""
|
|
# Parse CLAUDE.md for agents
|
|
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
|
|
|
|
if "error" in agents_result:
|
|
return {
|
|
"error": agents_result["error"],
|
|
"agent_name": agent_name
|
|
}
|
|
|
|
# Find the specific agent
|
|
agent = None
|
|
for a in agents_result.get("agents", []):
|
|
if a["name"].lower() == agent_name.lower():
|
|
agent = a
|
|
break
|
|
|
|
if not agent:
|
|
return {
|
|
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
|
|
"agent_name": agent_name,
|
|
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
|
|
}
|
|
|
|
# Collect all available tools from plugins
|
|
available_tools = set()
|
|
if plugin_paths:
|
|
for plugin_path in plugin_paths:
|
|
interface = await self.parse_tools.parse_plugin_interface(plugin_path)
|
|
if "error" not in interface:
|
|
for tool in interface.get("tools", []):
|
|
available_tools.add(tool["name"])
|
|
|
|
# Check agent tool references
|
|
tool_refs = set(agent.get("tool_refs", []))
|
|
found = tool_refs & available_tools if available_tools else tool_refs
|
|
missing = tool_refs - available_tools if available_tools else set()
|
|
|
|
issues = []
|
|
|
|
# Report missing tools
|
|
for tool in missing:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.ERROR,
|
|
issue_type=IssueType.MISSING_TOOL,
|
|
message=f"Agent '{agent_name}' references tool '{tool}' which is not found",
|
|
location=claude_md_path,
|
|
suggestion=f"Check if tool '{tool}' exists or fix the reference"
|
|
))
|
|
|
|
# Check if agent has no tool refs (might be incomplete)
|
|
if not tool_refs:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.INFO,
|
|
issue_type=IssueType.UNDECLARED_OUTPUT,
|
|
message=f"Agent '{agent_name}' has no documented tool references",
|
|
location=claude_md_path,
|
|
suggestion="Consider documenting which tools this agent uses"
|
|
))
|
|
|
|
result = AgentValidationResult(
|
|
agent_name=agent_name,
|
|
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
|
|
tool_refs_found=list(found),
|
|
tool_refs_missing=list(missing),
|
|
issues=issues
|
|
)
|
|
|
|
return result.model_dump()
|
|
|
|
async def validate_data_flow(self, agent_name: str, claude_md_path: str) -> dict:
|
|
"""
|
|
Validate data flow through an agent's tool sequence.
|
|
|
|
Checks that each step's expected output can be used by the next step.
|
|
|
|
Args:
|
|
agent_name: Name of the agent to validate
|
|
claude_md_path: Path to CLAUDE.md containing the agent
|
|
|
|
Returns:
|
|
Data flow validation result with steps and issues
|
|
"""
|
|
# Parse CLAUDE.md for agents
|
|
agents_result = await self.parse_tools.parse_claude_md_agents(claude_md_path)
|
|
|
|
if "error" in agents_result:
|
|
return {
|
|
"error": agents_result["error"],
|
|
"agent_name": agent_name
|
|
}
|
|
|
|
# Find the specific agent
|
|
agent = None
|
|
for a in agents_result.get("agents", []):
|
|
if a["name"].lower() == agent_name.lower():
|
|
agent = a
|
|
break
|
|
|
|
if not agent:
|
|
return {
|
|
"error": f"Agent '{agent_name}' not found in {claude_md_path}",
|
|
"agent_name": agent_name,
|
|
"available_agents": [a["name"] for a in agents_result.get("agents", [])]
|
|
}
|
|
|
|
issues = []
|
|
flow_steps = []
|
|
|
|
# Extract workflow steps
|
|
workflow_steps = agent.get("workflow_steps", [])
|
|
responsibilities = agent.get("responsibilities", [])
|
|
|
|
# Build flow from workflow steps or responsibilities
|
|
steps = workflow_steps if workflow_steps else responsibilities
|
|
|
|
for i, step in enumerate(steps):
|
|
flow_steps.append(f"Step {i+1}: {step}")
|
|
|
|
# Check for data flow patterns
|
|
tool_refs = agent.get("tool_refs", [])
|
|
|
|
# Known data flow patterns
|
|
# e.g., data-platform produces data_ref, viz-platform consumes it
|
|
known_producers = {
|
|
"read_csv": "data_ref",
|
|
"read_parquet": "data_ref",
|
|
"pg_query": "data_ref",
|
|
"filter": "data_ref",
|
|
"groupby": "data_ref",
|
|
}
|
|
|
|
known_consumers = {
|
|
"describe": "data_ref",
|
|
"head": "data_ref",
|
|
"tail": "data_ref",
|
|
"to_csv": "data_ref",
|
|
"to_parquet": "data_ref",
|
|
}
|
|
|
|
# Check if agent uses tools that require data_ref
|
|
has_producer = any(t in known_producers for t in tool_refs)
|
|
has_consumer = any(t in known_consumers for t in tool_refs)
|
|
|
|
if has_consumer and not has_producer:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.WARNING,
|
|
issue_type=IssueType.INTERFACE_MISMATCH,
|
|
message=f"Agent '{agent_name}' uses tools that consume data_ref but no producer found",
|
|
location=claude_md_path,
|
|
suggestion="Ensure a data loading tool (read_csv, pg_query, etc.) is used before data consumers"
|
|
))
|
|
|
|
# Check for empty workflow
|
|
if not steps and not tool_refs:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.INFO,
|
|
issue_type=IssueType.UNDECLARED_OUTPUT,
|
|
message=f"Agent '{agent_name}' has no documented workflow or tool sequence",
|
|
location=claude_md_path,
|
|
suggestion="Consider documenting the agent's workflow steps"
|
|
))
|
|
|
|
result = DataFlowResult(
|
|
agent_name=agent_name,
|
|
valid=len([i for i in issues if i.severity == IssueSeverity.ERROR]) == 0,
|
|
flow_steps=flow_steps,
|
|
issues=issues
|
|
)
|
|
|
|
return result.model_dump()
|
|
|
|
async def validate_workflow_integration(
|
|
self,
|
|
plugin_path: str,
|
|
domain_label: str,
|
|
expected_contract: Optional[str] = None
|
|
) -> dict:
|
|
"""
|
|
Validate that a domain plugin exposes required advisory interfaces.
|
|
|
|
Checks for:
|
|
- Gate command (e.g., /design-gate, /data-gate) - REQUIRED
|
|
- Gate contract version (gate_contract in frontmatter) - INFO if missing
|
|
- Review command (e.g., /design-review, /data-review) - recommended
|
|
- Advisory agent referencing the domain label - recommended
|
|
|
|
Args:
|
|
plugin_path: Path to the domain plugin directory
|
|
domain_label: The Domain/* label it claims to handle (e.g., Domain/Viz)
|
|
expected_contract: Expected contract version (e.g., 'v1'). If provided,
|
|
validates the gate command's contract matches.
|
|
|
|
Returns:
|
|
Validation result with found interfaces and issues
|
|
"""
|
|
import re
|
|
|
|
plugin_path_obj = Path(plugin_path)
|
|
issues = []
|
|
|
|
# Extract plugin name from path
|
|
plugin_name = plugin_path_obj.name
|
|
if not plugin_path_obj.exists():
|
|
return {
|
|
"error": f"Plugin directory not found: {plugin_path}",
|
|
"plugin_path": plugin_path,
|
|
"domain_label": domain_label
|
|
}
|
|
|
|
# Extract domain short name from label (e.g., "Domain/Viz" -> "viz", "Domain/Data" -> "data")
|
|
domain_short = domain_label.split("/")[-1].lower() if "/" in domain_label else domain_label.lower()
|
|
|
|
# Check for gate command
|
|
commands_dir = plugin_path_obj / "commands"
|
|
gate_command_found = False
|
|
gate_contract = None
|
|
gate_patterns = ["pass", "fail", "PASS", "FAIL", "Binary pass/fail", "gate"]
|
|
|
|
if commands_dir.exists():
|
|
for cmd_file in commands_dir.glob("*.md"):
|
|
if "gate" in cmd_file.name.lower():
|
|
# Verify it's actually a gate command by checking content
|
|
content = cmd_file.read_text()
|
|
if any(pattern in content for pattern in gate_patterns):
|
|
gate_command_found = True
|
|
# Parse frontmatter for gate_contract
|
|
frontmatter_match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
|
|
if frontmatter_match:
|
|
frontmatter = frontmatter_match.group(1)
|
|
contract_match = re.search(r'gate_contract:\s*(\S+)', frontmatter)
|
|
if contract_match:
|
|
gate_contract = contract_match.group(1)
|
|
break
|
|
|
|
if not gate_command_found:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.ERROR,
|
|
issue_type=IssueType.MISSING_INTEGRATION,
|
|
message=f"Plugin '{plugin_name}' lacks a gate command for domain '{domain_label}'",
|
|
location=str(commands_dir),
|
|
suggestion=f"Create commands/{domain_short}-gate.md with binary PASS/FAIL output"
|
|
))
|
|
|
|
# Check for review command
|
|
review_command_found = False
|
|
if commands_dir.exists():
|
|
for cmd_file in commands_dir.glob("*.md"):
|
|
if "review" in cmd_file.name.lower() and "gate" not in cmd_file.name.lower():
|
|
review_command_found = True
|
|
break
|
|
|
|
if not review_command_found:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.WARNING,
|
|
issue_type=IssueType.MISSING_INTEGRATION,
|
|
message=f"Plugin '{plugin_name}' lacks a review command for domain '{domain_label}'",
|
|
location=str(commands_dir),
|
|
suggestion=f"Create commands/{domain_short}-review.md for detailed audits"
|
|
))
|
|
|
|
# Check for advisory agent
|
|
agents_dir = plugin_path_obj / "agents"
|
|
advisory_agent_found = False
|
|
|
|
if agents_dir.exists():
|
|
for agent_file in agents_dir.glob("*.md"):
|
|
content = agent_file.read_text()
|
|
# Check if agent references the domain label or gate command
|
|
if domain_label in content or f"{domain_short}-gate" in content.lower() or "advisor" in agent_file.name.lower() or "reviewer" in agent_file.name.lower():
|
|
advisory_agent_found = True
|
|
break
|
|
|
|
if not advisory_agent_found:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.WARNING,
|
|
issue_type=IssueType.MISSING_INTEGRATION,
|
|
message=f"Plugin '{plugin_name}' lacks an advisory agent for domain '{domain_label}'",
|
|
location=str(agents_dir) if agents_dir.exists() else str(plugin_path_obj),
|
|
suggestion=f"Create agents/{domain_short}-advisor.md referencing '{domain_label}'"
|
|
))
|
|
|
|
# Check gate contract version
|
|
if gate_command_found:
|
|
if not gate_contract:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.INFO,
|
|
issue_type=IssueType.MISSING_INTEGRATION,
|
|
message=f"Gate command does not declare a contract version",
|
|
location=str(commands_dir),
|
|
suggestion="Consider adding `gate_contract: v1` to frontmatter for version tracking"
|
|
))
|
|
elif expected_contract and gate_contract != expected_contract:
|
|
issues.append(ValidationIssue(
|
|
severity=IssueSeverity.WARNING,
|
|
issue_type=IssueType.INTERFACE_MISMATCH,
|
|
message=f"Contract version mismatch: gate declares {gate_contract}, projman expects {expected_contract}",
|
|
location=str(commands_dir),
|
|
suggestion=f"Update domain-consultation.md Gate Command Reference table to {gate_contract}, or update gate command to {expected_contract}"
|
|
))
|
|
|
|
result = WorkflowIntegrationResult(
|
|
plugin_name=plugin_name,
|
|
domain_label=domain_label,
|
|
valid=gate_command_found, # Only gate is required for validity
|
|
gate_command_found=gate_command_found,
|
|
gate_contract=gate_contract,
|
|
review_command_found=review_command_found,
|
|
advisory_agent_found=advisory_agent_found,
|
|
issues=issues
|
|
)
|
|
|
|
return result.model_dump()
|