Add "Change V04.0.0: Proposal (Implementation 1)"
452
Change-V04.0.0%3A-Proposal-%28Implementation-1%29.md
Normal file
452
Change-V04.0.0%3A-Proposal-%28Implementation-1%29.md
Normal file
@@ -0,0 +1,452 @@
|
|||||||
|
# data-platform Plugin Implementation Plan (v4.0.0)
|
||||||
|
|
||||||
|
> **Origin:** [Change V04.0.0: Proposal](Change-V04.0.0:-Proposal)
|
||||||
|
> **Status:** Implemented
|
||||||
|
> **Date:** 2026-01-25
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
Implement a new `data-platform` plugin for leo-claude-mktplace that addresses data workflow issues encountered in the personal-portfolio project:
|
||||||
|
- Lost data after multiple interactions (solved by Arrow IPC data_ref passing)
|
||||||
|
- dbt 1.9+ syntax deprecation (solved by pre-execution validation with `dbt parse`)
|
||||||
|
- Ungraceful PostgreSQL error handling (solved by SessionStart hook with warnings)
|
||||||
|
|
||||||
|
## Architecture Decisions
|
||||||
|
|
||||||
|
| Decision | Choice |
|
||||||
|
|----------|--------|
|
||||||
|
| Data Passing | Arrow IPC with data_ref |
|
||||||
|
| DB Auth | Environment variables (~/.config/claude/postgres.env) |
|
||||||
|
| dbt Discovery | Auto-detect + explicit override |
|
||||||
|
| dbt Validation | Pre-execution (`dbt parse`) |
|
||||||
|
| Plugin Structure | Single plugin, 3 MCP servers |
|
||||||
|
| Server Location | Root mcp-servers/ |
|
||||||
|
| Memory Management | 100k row limit with chunking |
|
||||||
|
| PostGIS Support | Yes, with geoalchemy2 |
|
||||||
|
| Agent Model | 2 agents (Ingestion + Analysis) |
|
||||||
|
| Commands | Core 6 |
|
||||||
|
| Startup Hook | Graceful DB warning (non-blocking) |
|
||||||
|
| MCP Framework | Manual SDK (following gitea pattern) |
|
||||||
|
|
||||||
|
## File Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
mcp-servers/
|
||||||
|
└── data-platform/
|
||||||
|
├── mcp_server/
|
||||||
|
│ ├── __init__.py
|
||||||
|
│ ├── server.py # Main MCP server with routing
|
||||||
|
│ ├── config.py # Hybrid config (system + project)
|
||||||
|
│ ├── data_store.py # Arrow IPC DataFrame registry
|
||||||
|
│ ├── pandas_tools.py # pandas tool implementations
|
||||||
|
│ ├── postgres_tools.py # PostgreSQL/PostGIS tools
|
||||||
|
│ └── dbt_tools.py # dbt CLI wrapper tools
|
||||||
|
├── requirements.txt
|
||||||
|
├── pyproject.toml
|
||||||
|
└── README.md
|
||||||
|
|
||||||
|
plugins/
|
||||||
|
└── data-platform/
|
||||||
|
├── .claude-plugin/
|
||||||
|
│ └── plugin.json
|
||||||
|
├── .mcp.json
|
||||||
|
├── mcp-servers/
|
||||||
|
│ └── data-platform -> ../../../mcp-servers/data-platform # symlink
|
||||||
|
├── commands/
|
||||||
|
│ ├── ingest.md # /ingest command
|
||||||
|
│ ├── profile.md # /profile command
|
||||||
|
│ ├── schema.md # /schema command
|
||||||
|
│ ├── explain.md # /explain command
|
||||||
|
│ ├── lineage.md # /lineage command
|
||||||
|
│ └── run.md # /run command
|
||||||
|
├── agents/
|
||||||
|
│ ├── data-ingestion.md # Data loading and transformation
|
||||||
|
│ └── data-analysis.md # Exploration and profiling
|
||||||
|
├── hooks/
|
||||||
|
│ └── hooks.json # SessionStart DB check
|
||||||
|
├── README.md
|
||||||
|
└── claude-md-integration.md
|
||||||
|
```
|
||||||
|
|
||||||
|
## Implementation Phases
|
||||||
|
|
||||||
|
### Phase 1: Foundation (Issues #1-2)
|
||||||
|
|
||||||
|
**Files to create:**
|
||||||
|
- `mcp-servers/data-platform/mcp_server/__init__.py`
|
||||||
|
- `mcp-servers/data-platform/mcp_server/config.py`
|
||||||
|
- `mcp-servers/data-platform/mcp_server/data_store.py`
|
||||||
|
- `mcp-servers/data-platform/mcp_server/server.py` (skeleton)
|
||||||
|
- `mcp-servers/data-platform/requirements.txt`
|
||||||
|
- `mcp-servers/data-platform/pyproject.toml`
|
||||||
|
|
||||||
|
**config.py pattern** (from gitea):
|
||||||
|
```python
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
def load_config():
|
||||||
|
# System-level credentials
|
||||||
|
system_env = Path.home() / ".config/claude/postgres.env"
|
||||||
|
if system_env.exists():
|
||||||
|
load_dotenv(system_env)
|
||||||
|
|
||||||
|
# Project-level settings
|
||||||
|
project_env = Path.cwd() / ".env"
|
||||||
|
if project_env.exists():
|
||||||
|
load_dotenv(project_env, override=True)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"postgres_url": os.getenv("POSTGRES_URL"),
|
||||||
|
"dbt_project_dir": os.getenv("DBT_PROJECT_DIR"),
|
||||||
|
"dbt_profiles_dir": os.getenv("DBT_PROFILES_DIR"),
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**data_store.py** (Arrow IPC registry):
|
||||||
|
```python
|
||||||
|
import pyarrow as pa
|
||||||
|
import uuid
|
||||||
|
from typing import Dict, Optional
|
||||||
|
|
||||||
|
class DataStore:
|
||||||
|
_instance = None
|
||||||
|
_dataframes: Dict[str, pa.Table] = {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_instance(cls):
|
||||||
|
if cls._instance is None:
|
||||||
|
cls._instance = cls()
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
def store(self, df: pa.Table, name: Optional[str] = None) -> str:
|
||||||
|
data_ref = name or f"df_{uuid.uuid4().hex[:8]}"
|
||||||
|
self._dataframes[data_ref] = df
|
||||||
|
return data_ref
|
||||||
|
|
||||||
|
def get(self, data_ref: str) -> Optional[pa.Table]:
|
||||||
|
return self._dataframes.get(data_ref)
|
||||||
|
|
||||||
|
def list_refs(self) -> list:
|
||||||
|
return [{"ref": k, "rows": v.num_rows, "cols": v.num_columns}
|
||||||
|
for k, v in self._dataframes.items()]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Phase 2: pandas-mcp Tools (Issue #3)
|
||||||
|
|
||||||
|
**Tools to implement in pandas_tools.py:**
|
||||||
|
|
||||||
|
| Tool | Description |
|
||||||
|
|------|-------------|
|
||||||
|
| `read_csv` | Load CSV with optional chunking |
|
||||||
|
| `read_parquet` | Load Parquet files |
|
||||||
|
| `read_json` | Load JSON/JSONL files |
|
||||||
|
| `to_csv` | Export DataFrame to CSV |
|
||||||
|
| `to_parquet` | Export DataFrame to Parquet |
|
||||||
|
| `describe` | Statistical summary |
|
||||||
|
| `head` | First N rows |
|
||||||
|
| `tail` | Last N rows |
|
||||||
|
| `filter` | Filter rows by condition |
|
||||||
|
| `select` | Select columns |
|
||||||
|
| `groupby` | Group and aggregate |
|
||||||
|
| `join` | Join two DataFrames |
|
||||||
|
| `list_data` | List all stored DataFrames |
|
||||||
|
| `drop_data` | Remove DataFrame from store |
|
||||||
|
|
||||||
|
**Memory management:**
|
||||||
|
```python
|
||||||
|
MAX_ROWS = 100_000
|
||||||
|
|
||||||
|
def read_csv(file_path: str, chunk_size: int = None) -> dict:
|
||||||
|
df = pd.read_csv(file_path)
|
||||||
|
if len(df) > MAX_ROWS:
|
||||||
|
return {
|
||||||
|
"warning": f"DataFrame has {len(df)} rows, exceeds {MAX_ROWS} limit",
|
||||||
|
"suggestion": f"Use chunk_size={MAX_ROWS} for chunked processing",
|
||||||
|
"preview": df.head(100).to_dict()
|
||||||
|
}
|
||||||
|
# Convert to Arrow and store
|
||||||
|
table = pa.Table.from_pandas(df)
|
||||||
|
data_ref = DataStore.get_instance().store(table)
|
||||||
|
return {"data_ref": data_ref, "rows": len(df), "columns": list(df.columns)}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Phase 3: postgres-mcp Tools (Issue #4)
|
||||||
|
|
||||||
|
**Tools to implement in postgres_tools.py:**
|
||||||
|
|
||||||
|
| Tool | Description |
|
||||||
|
|------|-------------|
|
||||||
|
| `pg_connect` | Test connection and return status |
|
||||||
|
| `pg_query` | Execute SELECT, return as data_ref |
|
||||||
|
| `pg_execute` | Execute INSERT/UPDATE/DELETE |
|
||||||
|
| `pg_tables` | List all tables in schema |
|
||||||
|
| `pg_columns` | Get column info for table |
|
||||||
|
| `pg_schemas` | List all schemas |
|
||||||
|
| `st_tables` | List PostGIS-enabled tables |
|
||||||
|
| `st_geometry_type` | Get geometry type of column |
|
||||||
|
| `st_srid` | Get SRID of geometry column |
|
||||||
|
| `st_extent` | Get bounding box of geometries |
|
||||||
|
|
||||||
|
**asyncpg implementation:**
|
||||||
|
```python
|
||||||
|
import asyncpg
|
||||||
|
from geoalchemy2 import Geometry
|
||||||
|
|
||||||
|
async def pg_query(query: str, params: list = None) -> dict:
|
||||||
|
config = load_config()
|
||||||
|
conn = await asyncpg.connect(config["postgres_url"])
|
||||||
|
try:
|
||||||
|
rows = await conn.fetch(query, *(params or []))
|
||||||
|
df = pd.DataFrame([dict(r) for r in rows])
|
||||||
|
if len(df) > MAX_ROWS:
|
||||||
|
return {"warning": "Result truncated", "data_ref": store_truncated(df)}
|
||||||
|
table = pa.Table.from_pandas(df)
|
||||||
|
data_ref = DataStore.get_instance().store(table)
|
||||||
|
return {"data_ref": data_ref, "rows": len(df)}
|
||||||
|
finally:
|
||||||
|
await conn.close()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Phase 4: dbt-mcp Tools (Issue #5)
|
||||||
|
|
||||||
|
**Tools to implement in dbt_tools.py:**
|
||||||
|
|
||||||
|
| Tool | Description |
|
||||||
|
|------|-------------|
|
||||||
|
| `dbt_parse` | Validate project (pre-execution) |
|
||||||
|
| `dbt_run` | Run models with selection |
|
||||||
|
| `dbt_test` | Run tests |
|
||||||
|
| `dbt_build` | Run + test |
|
||||||
|
| `dbt_compile` | Compile SQL without executing |
|
||||||
|
| `dbt_ls` | List resources |
|
||||||
|
| `dbt_docs_generate` | Generate documentation |
|
||||||
|
| `dbt_lineage` | Get model dependencies |
|
||||||
|
|
||||||
|
**Pre-execution validation pattern:**
|
||||||
|
```python
|
||||||
|
import subprocess
|
||||||
|
import json
|
||||||
|
|
||||||
|
def dbt_run(select: str = None, exclude: str = None) -> dict:
|
||||||
|
config = load_config()
|
||||||
|
project_dir = config.get("dbt_project_dir") or find_dbt_project()
|
||||||
|
|
||||||
|
# ALWAYS validate first
|
||||||
|
parse_result = subprocess.run(
|
||||||
|
["dbt", "parse", "--project-dir", project_dir],
|
||||||
|
capture_output=True, text=True
|
||||||
|
)
|
||||||
|
if parse_result.returncode != 0:
|
||||||
|
return {
|
||||||
|
"error": "dbt parse failed - fix issues before running",
|
||||||
|
"details": parse_result.stderr,
|
||||||
|
"suggestion": "Check for deprecated syntax (dbt 1.9+)"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Execute run
|
||||||
|
cmd = ["dbt", "run", "--project-dir", project_dir]
|
||||||
|
if select:
|
||||||
|
cmd.extend(["--select", select])
|
||||||
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
|
return {"success": result.returncode == 0, "output": result.stdout}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Phase 5: Plugin Wrapper (Issue #6)
|
||||||
|
|
||||||
|
**plugins/data-platform/.claude-plugin/plugin.json:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "data-platform",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "Data engineering tools with pandas, PostgreSQL/PostGIS, and dbt integration",
|
||||||
|
"author": "Leo Miranda",
|
||||||
|
"license": "MIT",
|
||||||
|
"hooks": "hooks/hooks.json",
|
||||||
|
"commands": "commands/",
|
||||||
|
"agents": "agents/",
|
||||||
|
"mcp": ".mcp.json"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**plugins/data-platform/.mcp.json:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"mcpServers": {
|
||||||
|
"data-platform": {
|
||||||
|
"type": "stdio",
|
||||||
|
"command": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform/.venv/bin/python",
|
||||||
|
"args": ["-m", "mcp_server.server"],
|
||||||
|
"cwd": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**plugins/data-platform/hooks/hooks.json:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"hooks": [
|
||||||
|
{
|
||||||
|
"event": "SessionStart",
|
||||||
|
"type": "command",
|
||||||
|
"command": ["${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform/.venv/bin/python", "-c", "from mcp_server.postgres_tools import check_connection; check_connection()"],
|
||||||
|
"timeout": 5000,
|
||||||
|
"onError": "warn"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Agents:**
|
||||||
|
|
||||||
|
`agents/data-ingestion.md`:
|
||||||
|
```markdown
|
||||||
|
# Data Ingestion Agent
|
||||||
|
|
||||||
|
You are a data ingestion specialist. Your role is to help users load, transform, and prepare data for analysis.
|
||||||
|
|
||||||
|
## Available Tools
|
||||||
|
- pandas: read_csv, read_parquet, read_json, filter, select, groupby, join
|
||||||
|
- postgres: pg_query, pg_execute
|
||||||
|
|
||||||
|
## Workflow
|
||||||
|
1. Understand the data source and format
|
||||||
|
2. Load data with appropriate chunking for large files
|
||||||
|
3. Transform as needed (filter, select, aggregate)
|
||||||
|
4. Store results with meaningful data_ref names
|
||||||
|
```
|
||||||
|
|
||||||
|
`agents/data-analysis.md`:
|
||||||
|
```markdown
|
||||||
|
# Data Analysis Agent
|
||||||
|
|
||||||
|
You are a data analysis specialist. Your role is to help users explore, profile, and understand their data.
|
||||||
|
|
||||||
|
## Available Tools
|
||||||
|
- pandas: describe, head, tail, list_data
|
||||||
|
- postgres: pg_tables, pg_columns
|
||||||
|
- dbt: dbt_lineage, dbt_docs_generate
|
||||||
|
|
||||||
|
## Workflow
|
||||||
|
1. List available data (list_data or pg_tables)
|
||||||
|
2. Profile data structure and statistics
|
||||||
|
3. Identify patterns and anomalies
|
||||||
|
4. Provide insights and recommendations
|
||||||
|
```
|
||||||
|
|
||||||
|
**Commands:**
|
||||||
|
|
||||||
|
| Command | File | Description |
|
||||||
|
|---------|------|-------------|
|
||||||
|
| `/ingest` | commands/ingest.md | Load data from files or database |
|
||||||
|
| `/profile` | commands/profile.md | Generate data profile and statistics |
|
||||||
|
| `/schema` | commands/schema.md | Show database/DataFrame schema |
|
||||||
|
| `/explain` | commands/explain.md | Explain dbt model lineage |
|
||||||
|
| `/lineage` | commands/lineage.md | Visualize data dependencies |
|
||||||
|
| `/run` | commands/run.md | Execute dbt models |
|
||||||
|
|
||||||
|
### Phase 6: Documentation & Integration
|
||||||
|
|
||||||
|
**Files to update:**
|
||||||
|
- `.claude-plugin/marketplace.json` - Add data-platform plugin entry
|
||||||
|
- `CHANGELOG.md` - Add v4.0.0 section under [Unreleased]
|
||||||
|
- `README.md` - Update plugin table
|
||||||
|
|
||||||
|
**Files to create:**
|
||||||
|
- `plugins/data-platform/README.md`
|
||||||
|
- `plugins/data-platform/claude-md-integration.md`
|
||||||
|
- `mcp-servers/data-platform/README.md`
|
||||||
|
|
||||||
|
## Sprint Structure (projman)
|
||||||
|
|
||||||
|
**Milestone:** Sprint 1 - data-platform Plugin (v4.0.0)
|
||||||
|
|
||||||
|
### Gitea Issues to Create
|
||||||
|
|
||||||
|
| # | Title | Labels | Effort |
|
||||||
|
|---|-------|--------|--------|
|
||||||
|
| 1 | [Sprint 01] feat: MCP server foundation and config | Type/Feature, Priority/High, Complexity/Medium, Effort/M, Tech/Python, Component/Backend | 1-2 days |
|
||||||
|
| 2 | [Sprint 01] feat: Arrow IPC data registry with memory limits | Type/Feature, Priority/High, Complexity/Medium, Effort/M, Tech/Python, Component/Backend | 1-2 days |
|
||||||
|
| 3 | [Sprint 01] feat: pandas-mcp core data operations (14 tools) | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Component/Backend | 3-5 days |
|
||||||
|
| 4 | [Sprint 01] feat: postgres-mcp database tools with PostGIS | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Tech/PostgreSQL, Component/Database | 3-5 days |
|
||||||
|
| 5 | [Sprint 01] feat: dbt-mcp build tools with pre-validation | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Component/Backend | 3-5 days |
|
||||||
|
| 6 | [Sprint 01] feat: Plugin wrapper, commands, and agents | Type/Feature, Priority/Medium, Complexity/Medium, Effort/M, Component/Docs | 1-2 days |
|
||||||
|
| 7 | [Sprint 01] docs: Documentation and marketplace integration | Type/Documentation, Priority/Medium, Complexity/Simple, Effort/S, Component/Docs | 2-4 hours |
|
||||||
|
|
||||||
|
### Issue Dependencies
|
||||||
|
|
||||||
|
```
|
||||||
|
#1 (foundation) ─┬─> #2 (data registry)
|
||||||
|
│
|
||||||
|
├─> #3 (pandas-mcp) ──┐
|
||||||
|
│ │
|
||||||
|
├─> #4 (postgres-mcp) ├─> #6 (plugin wrapper) ─> #7 (docs)
|
||||||
|
│ │
|
||||||
|
└─> #5 (dbt-mcp) ─────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
**Parallel Execution Batches:**
|
||||||
|
1. Batch 1: #1 (foundation)
|
||||||
|
2. Batch 2: #2, #3, #4, #5 (can run in parallel after foundation)
|
||||||
|
3. Batch 3: #6 (plugin wrapper - needs all tools complete)
|
||||||
|
4. Batch 4: #7 (docs - final)
|
||||||
|
|
||||||
|
## Verification Steps
|
||||||
|
|
||||||
|
1. **MCP Server starts:**
|
||||||
|
```bash
|
||||||
|
cd mcp-servers/data-platform
|
||||||
|
python -m venv .venv
|
||||||
|
source .venv/bin/activate
|
||||||
|
pip install -r requirements.txt
|
||||||
|
python -m mcp_server.server
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Tools are registered:**
|
||||||
|
- Start Claude Code in a test project
|
||||||
|
- Run `/ingest` command
|
||||||
|
- Verify MCP tools appear in tool list
|
||||||
|
|
||||||
|
3. **Data persistence:**
|
||||||
|
- Load a CSV file with `/ingest`
|
||||||
|
- Run multiple commands referencing the data_ref
|
||||||
|
- Verify data persists across tool calls
|
||||||
|
|
||||||
|
4. **PostgreSQL connection:**
|
||||||
|
- Configure `~/.config/claude/postgres.env`
|
||||||
|
- Start new session
|
||||||
|
- Verify SessionStart hook shows connection status (warning if unavailable)
|
||||||
|
|
||||||
|
5. **dbt validation:**
|
||||||
|
- Run `/run` on a dbt project with deprecated syntax
|
||||||
|
- Verify pre-execution validation catches issues
|
||||||
|
- Fix syntax and re-run successfully
|
||||||
|
|
||||||
|
6. **Validation script:**
|
||||||
|
```bash
|
||||||
|
./scripts/validate-marketplace.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
```
|
||||||
|
# requirements.txt
|
||||||
|
mcp>=1.0.0
|
||||||
|
pandas>=2.0.0
|
||||||
|
pyarrow>=14.0.0
|
||||||
|
asyncpg>=0.29.0
|
||||||
|
geoalchemy2>=0.14.0
|
||||||
|
python-dotenv>=1.0.0
|
||||||
|
dbt-core>=1.9.0
|
||||||
|
dbt-postgres>=1.9.0
|
||||||
|
```
|
||||||
|
|
||||||
|
## Out of Scope (v4.1.0+)
|
||||||
|
|
||||||
|
- Integration with projman sprint tracking
|
||||||
|
- Cross-plugin DataFrame sharing
|
||||||
|
- Visualization components (deferred to v5.0.0)
|
||||||
|
- Advanced dbt features (seeds, snapshots, exposures)
|
||||||
Reference in New Issue
Block a user