From 01f9ca1d422d932dc9e63ba6a73aa16c2e80fbcb Mon Sep 17 00:00:00 2001 From: Leo Miranda Date: Sun, 25 Jan 2026 19:01:08 +0000 Subject: [PATCH] Add "Change V04.0.0: Proposal (Implementation 1)" --- ....0.0%3A-Proposal-%28Implementation-1%29.md | 452 ++++++++++++++++++ 1 file changed, 452 insertions(+) create mode 100644 Change-V04.0.0%3A-Proposal-%28Implementation-1%29.md diff --git a/Change-V04.0.0%3A-Proposal-%28Implementation-1%29.md b/Change-V04.0.0%3A-Proposal-%28Implementation-1%29.md new file mode 100644 index 0000000..79cb738 --- /dev/null +++ b/Change-V04.0.0%3A-Proposal-%28Implementation-1%29.md @@ -0,0 +1,452 @@ +# data-platform Plugin Implementation Plan (v4.0.0) + +> **Origin:** [Change V04.0.0: Proposal](Change-V04.0.0:-Proposal) +> **Status:** Implemented +> **Date:** 2026-01-25 + +--- + +## Overview + +Implement a new `data-platform` plugin for leo-claude-mktplace that addresses data workflow issues encountered in the personal-portfolio project: +- Lost data after multiple interactions (solved by Arrow IPC data_ref passing) +- dbt 1.9+ syntax deprecation (solved by pre-execution validation with `dbt parse`) +- Ungraceful PostgreSQL error handling (solved by SessionStart hook with warnings) + +## Architecture Decisions + +| Decision | Choice | +|----------|--------| +| Data Passing | Arrow IPC with data_ref | +| DB Auth | Environment variables (~/.config/claude/postgres.env) | +| dbt Discovery | Auto-detect + explicit override | +| dbt Validation | Pre-execution (`dbt parse`) | +| Plugin Structure | Single plugin, 3 MCP servers | +| Server Location | Root mcp-servers/ | +| Memory Management | 100k row limit with chunking | +| PostGIS Support | Yes, with geoalchemy2 | +| Agent Model | 2 agents (Ingestion + Analysis) | +| Commands | Core 6 | +| Startup Hook | Graceful DB warning (non-blocking) | +| MCP Framework | Manual SDK (following gitea pattern) | + +## File Structure + +``` +mcp-servers/ +└── data-platform/ + ├── mcp_server/ + │ ├── __init__.py + │ ├── server.py # Main MCP server with routing + │ ├── config.py # Hybrid config (system + project) + │ ├── data_store.py # Arrow IPC DataFrame registry + │ ├── pandas_tools.py # pandas tool implementations + │ ├── postgres_tools.py # PostgreSQL/PostGIS tools + │ └── dbt_tools.py # dbt CLI wrapper tools + ├── requirements.txt + ├── pyproject.toml + └── README.md + +plugins/ +└── data-platform/ + ├── .claude-plugin/ + │ └── plugin.json + ├── .mcp.json + ├── mcp-servers/ + │ └── data-platform -> ../../../mcp-servers/data-platform # symlink + ├── commands/ + │ ├── ingest.md # /ingest command + │ ├── profile.md # /profile command + │ ├── schema.md # /schema command + │ ├── explain.md # /explain command + │ ├── lineage.md # /lineage command + │ └── run.md # /run command + ├── agents/ + │ ├── data-ingestion.md # Data loading and transformation + │ └── data-analysis.md # Exploration and profiling + ├── hooks/ + │ └── hooks.json # SessionStart DB check + ├── README.md + └── claude-md-integration.md +``` + +## Implementation Phases + +### Phase 1: Foundation (Issues #1-2) + +**Files to create:** +- `mcp-servers/data-platform/mcp_server/__init__.py` +- `mcp-servers/data-platform/mcp_server/config.py` +- `mcp-servers/data-platform/mcp_server/data_store.py` +- `mcp-servers/data-platform/mcp_server/server.py` (skeleton) +- `mcp-servers/data-platform/requirements.txt` +- `mcp-servers/data-platform/pyproject.toml` + +**config.py pattern** (from gitea): +```python +import os +from pathlib import Path + +def load_config(): + # System-level credentials + system_env = Path.home() / ".config/claude/postgres.env" + if system_env.exists(): + load_dotenv(system_env) + + # Project-level settings + project_env = Path.cwd() / ".env" + if project_env.exists(): + load_dotenv(project_env, override=True) + + return { + "postgres_url": os.getenv("POSTGRES_URL"), + "dbt_project_dir": os.getenv("DBT_PROJECT_DIR"), + "dbt_profiles_dir": os.getenv("DBT_PROFILES_DIR"), + } +``` + +**data_store.py** (Arrow IPC registry): +```python +import pyarrow as pa +import uuid +from typing import Dict, Optional + +class DataStore: + _instance = None + _dataframes: Dict[str, pa.Table] = {} + + @classmethod + def get_instance(cls): + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def store(self, df: pa.Table, name: Optional[str] = None) -> str: + data_ref = name or f"df_{uuid.uuid4().hex[:8]}" + self._dataframes[data_ref] = df + return data_ref + + def get(self, data_ref: str) -> Optional[pa.Table]: + return self._dataframes.get(data_ref) + + def list_refs(self) -> list: + return [{"ref": k, "rows": v.num_rows, "cols": v.num_columns} + for k, v in self._dataframes.items()] +``` + +### Phase 2: pandas-mcp Tools (Issue #3) + +**Tools to implement in pandas_tools.py:** + +| Tool | Description | +|------|-------------| +| `read_csv` | Load CSV with optional chunking | +| `read_parquet` | Load Parquet files | +| `read_json` | Load JSON/JSONL files | +| `to_csv` | Export DataFrame to CSV | +| `to_parquet` | Export DataFrame to Parquet | +| `describe` | Statistical summary | +| `head` | First N rows | +| `tail` | Last N rows | +| `filter` | Filter rows by condition | +| `select` | Select columns | +| `groupby` | Group and aggregate | +| `join` | Join two DataFrames | +| `list_data` | List all stored DataFrames | +| `drop_data` | Remove DataFrame from store | + +**Memory management:** +```python +MAX_ROWS = 100_000 + +def read_csv(file_path: str, chunk_size: int = None) -> dict: + df = pd.read_csv(file_path) + if len(df) > MAX_ROWS: + return { + "warning": f"DataFrame has {len(df)} rows, exceeds {MAX_ROWS} limit", + "suggestion": f"Use chunk_size={MAX_ROWS} for chunked processing", + "preview": df.head(100).to_dict() + } + # Convert to Arrow and store + table = pa.Table.from_pandas(df) + data_ref = DataStore.get_instance().store(table) + return {"data_ref": data_ref, "rows": len(df), "columns": list(df.columns)} +``` + +### Phase 3: postgres-mcp Tools (Issue #4) + +**Tools to implement in postgres_tools.py:** + +| Tool | Description | +|------|-------------| +| `pg_connect` | Test connection and return status | +| `pg_query` | Execute SELECT, return as data_ref | +| `pg_execute` | Execute INSERT/UPDATE/DELETE | +| `pg_tables` | List all tables in schema | +| `pg_columns` | Get column info for table | +| `pg_schemas` | List all schemas | +| `st_tables` | List PostGIS-enabled tables | +| `st_geometry_type` | Get geometry type of column | +| `st_srid` | Get SRID of geometry column | +| `st_extent` | Get bounding box of geometries | + +**asyncpg implementation:** +```python +import asyncpg +from geoalchemy2 import Geometry + +async def pg_query(query: str, params: list = None) -> dict: + config = load_config() + conn = await asyncpg.connect(config["postgres_url"]) + try: + rows = await conn.fetch(query, *(params or [])) + df = pd.DataFrame([dict(r) for r in rows]) + if len(df) > MAX_ROWS: + return {"warning": "Result truncated", "data_ref": store_truncated(df)} + table = pa.Table.from_pandas(df) + data_ref = DataStore.get_instance().store(table) + return {"data_ref": data_ref, "rows": len(df)} + finally: + await conn.close() +``` + +### Phase 4: dbt-mcp Tools (Issue #5) + +**Tools to implement in dbt_tools.py:** + +| Tool | Description | +|------|-------------| +| `dbt_parse` | Validate project (pre-execution) | +| `dbt_run` | Run models with selection | +| `dbt_test` | Run tests | +| `dbt_build` | Run + test | +| `dbt_compile` | Compile SQL without executing | +| `dbt_ls` | List resources | +| `dbt_docs_generate` | Generate documentation | +| `dbt_lineage` | Get model dependencies | + +**Pre-execution validation pattern:** +```python +import subprocess +import json + +def dbt_run(select: str = None, exclude: str = None) -> dict: + config = load_config() + project_dir = config.get("dbt_project_dir") or find_dbt_project() + + # ALWAYS validate first + parse_result = subprocess.run( + ["dbt", "parse", "--project-dir", project_dir], + capture_output=True, text=True + ) + if parse_result.returncode != 0: + return { + "error": "dbt parse failed - fix issues before running", + "details": parse_result.stderr, + "suggestion": "Check for deprecated syntax (dbt 1.9+)" + } + + # Execute run + cmd = ["dbt", "run", "--project-dir", project_dir] + if select: + cmd.extend(["--select", select]) + result = subprocess.run(cmd, capture_output=True, text=True) + return {"success": result.returncode == 0, "output": result.stdout} +``` + +### Phase 5: Plugin Wrapper (Issue #6) + +**plugins/data-platform/.claude-plugin/plugin.json:** +```json +{ + "name": "data-platform", + "version": "1.0.0", + "description": "Data engineering tools with pandas, PostgreSQL/PostGIS, and dbt integration", + "author": "Leo Miranda", + "license": "MIT", + "hooks": "hooks/hooks.json", + "commands": "commands/", + "agents": "agents/", + "mcp": ".mcp.json" +} +``` + +**plugins/data-platform/.mcp.json:** +```json +{ + "mcpServers": { + "data-platform": { + "type": "stdio", + "command": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform/.venv/bin/python", + "args": ["-m", "mcp_server.server"], + "cwd": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform" + } + } +} +``` + +**plugins/data-platform/hooks/hooks.json:** +```json +{ + "hooks": [ + { + "event": "SessionStart", + "type": "command", + "command": ["${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform/.venv/bin/python", "-c", "from mcp_server.postgres_tools import check_connection; check_connection()"], + "timeout": 5000, + "onError": "warn" + } + ] +} +``` + +**Agents:** + +`agents/data-ingestion.md`: +```markdown +# Data Ingestion Agent + +You are a data ingestion specialist. Your role is to help users load, transform, and prepare data for analysis. + +## Available Tools +- pandas: read_csv, read_parquet, read_json, filter, select, groupby, join +- postgres: pg_query, pg_execute + +## Workflow +1. Understand the data source and format +2. Load data with appropriate chunking for large files +3. Transform as needed (filter, select, aggregate) +4. Store results with meaningful data_ref names +``` + +`agents/data-analysis.md`: +```markdown +# Data Analysis Agent + +You are a data analysis specialist. Your role is to help users explore, profile, and understand their data. + +## Available Tools +- pandas: describe, head, tail, list_data +- postgres: pg_tables, pg_columns +- dbt: dbt_lineage, dbt_docs_generate + +## Workflow +1. List available data (list_data or pg_tables) +2. Profile data structure and statistics +3. Identify patterns and anomalies +4. Provide insights and recommendations +``` + +**Commands:** + +| Command | File | Description | +|---------|------|-------------| +| `/ingest` | commands/ingest.md | Load data from files or database | +| `/profile` | commands/profile.md | Generate data profile and statistics | +| `/schema` | commands/schema.md | Show database/DataFrame schema | +| `/explain` | commands/explain.md | Explain dbt model lineage | +| `/lineage` | commands/lineage.md | Visualize data dependencies | +| `/run` | commands/run.md | Execute dbt models | + +### Phase 6: Documentation & Integration + +**Files to update:** +- `.claude-plugin/marketplace.json` - Add data-platform plugin entry +- `CHANGELOG.md` - Add v4.0.0 section under [Unreleased] +- `README.md` - Update plugin table + +**Files to create:** +- `plugins/data-platform/README.md` +- `plugins/data-platform/claude-md-integration.md` +- `mcp-servers/data-platform/README.md` + +## Sprint Structure (projman) + +**Milestone:** Sprint 1 - data-platform Plugin (v4.0.0) + +### Gitea Issues to Create + +| # | Title | Labels | Effort | +|---|-------|--------|--------| +| 1 | [Sprint 01] feat: MCP server foundation and config | Type/Feature, Priority/High, Complexity/Medium, Effort/M, Tech/Python, Component/Backend | 1-2 days | +| 2 | [Sprint 01] feat: Arrow IPC data registry with memory limits | Type/Feature, Priority/High, Complexity/Medium, Effort/M, Tech/Python, Component/Backend | 1-2 days | +| 3 | [Sprint 01] feat: pandas-mcp core data operations (14 tools) | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Component/Backend | 3-5 days | +| 4 | [Sprint 01] feat: postgres-mcp database tools with PostGIS | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Tech/PostgreSQL, Component/Database | 3-5 days | +| 5 | [Sprint 01] feat: dbt-mcp build tools with pre-validation | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Component/Backend | 3-5 days | +| 6 | [Sprint 01] feat: Plugin wrapper, commands, and agents | Type/Feature, Priority/Medium, Complexity/Medium, Effort/M, Component/Docs | 1-2 days | +| 7 | [Sprint 01] docs: Documentation and marketplace integration | Type/Documentation, Priority/Medium, Complexity/Simple, Effort/S, Component/Docs | 2-4 hours | + +### Issue Dependencies + +``` +#1 (foundation) ─┬─> #2 (data registry) + │ + ├─> #3 (pandas-mcp) ──┐ + │ │ + ├─> #4 (postgres-mcp) ├─> #6 (plugin wrapper) ─> #7 (docs) + │ │ + └─> #5 (dbt-mcp) ─────┘ +``` + +**Parallel Execution Batches:** +1. Batch 1: #1 (foundation) +2. Batch 2: #2, #3, #4, #5 (can run in parallel after foundation) +3. Batch 3: #6 (plugin wrapper - needs all tools complete) +4. Batch 4: #7 (docs - final) + +## Verification Steps + +1. **MCP Server starts:** + ```bash + cd mcp-servers/data-platform + python -m venv .venv + source .venv/bin/activate + pip install -r requirements.txt + python -m mcp_server.server + ``` + +2. **Tools are registered:** + - Start Claude Code in a test project + - Run `/ingest` command + - Verify MCP tools appear in tool list + +3. **Data persistence:** + - Load a CSV file with `/ingest` + - Run multiple commands referencing the data_ref + - Verify data persists across tool calls + +4. **PostgreSQL connection:** + - Configure `~/.config/claude/postgres.env` + - Start new session + - Verify SessionStart hook shows connection status (warning if unavailable) + +5. **dbt validation:** + - Run `/run` on a dbt project with deprecated syntax + - Verify pre-execution validation catches issues + - Fix syntax and re-run successfully + +6. **Validation script:** + ```bash + ./scripts/validate-marketplace.sh + ``` + +## Dependencies + +``` +# requirements.txt +mcp>=1.0.0 +pandas>=2.0.0 +pyarrow>=14.0.0 +asyncpg>=0.29.0 +geoalchemy2>=0.14.0 +python-dotenv>=1.0.0 +dbt-core>=1.9.0 +dbt-postgres>=1.9.0 +``` + +## Out of Scope (v4.1.0+) + +- Integration with projman sprint tracking +- Cross-plugin DataFrame sharing +- Visualization components (deferred to v5.0.0) +- Advanced dbt features (seeds, snapshots, exposures)